Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
46574c38
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
46574c38
编写于
8月 17, 2023
作者:
Y
yihaoDeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix mem leak
上级
58c0a3d8
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
54 addition
and
46 deletion
+54
-46
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+48
-43
source/dnode/mnode/sdb/src/sdbRaw.c
source/dnode/mnode/sdb/src/sdbRaw.c
+1
-1
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+5
-2
未找到文件。
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
46574c38
...
@@ -26,8 +26,8 @@
...
@@ -26,8 +26,8 @@
#include "mndUser.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndVgroup.h"
#include "parser.h"
#include "parser.h"
#include "tname.h"
#include "tmisce.h"
#include "tmisce.h"
#include "tname.h"
#define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_RESERVE_SIZE 64
...
@@ -35,14 +35,14 @@
...
@@ -35,14 +35,14 @@
typedef
struct
SNodeEntry
{
typedef
struct
SNodeEntry
{
int32_t
nodeId
;
int32_t
nodeId
;
SEpSet
epset
;
// compare the epset to identify the vgroup tranferring between different dnodes.
SEpSet
epset
;
// compare the epset to identify the vgroup tranferring between different dnodes.
int64_t
hbTimestamp
;
// second
int64_t
hbTimestamp
;
// second
}
SNodeEntry
;
}
SNodeEntry
;
typedef
struct
SStreamVnodeRevertIndex
{
typedef
struct
SStreamVnodeRevertIndex
{
SArray
*
pDBList
;
SArray
*
pDBList
;
SArray
*
pNodeEntryList
;
SArray
*
pNodeEntryList
;
int64_t
ts
;
// snapshot ts
int64_t
ts
;
// snapshot ts
}
SStreamVnodeRevertIndex
;
}
SStreamVnodeRevertIndex
;
static
int32_t
mndNodeCheckSentinel
=
0
;
static
int32_t
mndNodeCheckSentinel
=
0
;
...
@@ -70,8 +70,8 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
...
@@ -70,8 +70,8 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
int64_t
streamId
,
int32_t
taskId
);
int64_t
streamId
,
int32_t
taskId
);
static
int32_t
mndProcessNodeCheckReq
(
SRpcMsg
*
pMsg
);
static
int32_t
mndProcessNodeCheckReq
(
SRpcMsg
*
pMsg
);
static
int32_t
mndPersistTransLog
(
SStreamObj
*
pStream
,
STrans
*
pTrans
);
static
int32_t
mndPersistTransLog
(
SStreamObj
*
pStream
,
STrans
*
pTrans
);
static
void
initTransAction
(
STransAction
*
pAction
,
void
*
pCont
,
int32_t
contLen
,
int32_t
msgType
,
const
SEpSet
*
pEpset
);
static
void
initTransAction
(
STransAction
*
pAction
,
void
*
pCont
,
int32_t
contLen
,
int32_t
msgType
,
const
SEpSet
*
pEpset
);
int32_t
mndInitStream
(
SMnode
*
pMnode
)
{
int32_t
mndInitStream
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{
SSdbTable
table
=
{
...
@@ -98,7 +98,7 @@ int32_t mndInitStream(SMnode *pMnode) {
...
@@ -98,7 +98,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_STREAM_CHECKPOINT_TIMER
,
mndProcessStreamCheckpointTmr
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_STREAM_CHECKPOINT_TIMER
,
mndProcessStreamCheckpointTmr
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_STREAM_BEGIN_CHECKPOINT
,
mndProcessStreamDoCheckpoint
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_STREAM_BEGIN_CHECKPOINT
,
mndProcessStreamDoCheckpoint
);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_REPORT_CHECKPOINT
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_REPORT_CHECKPOINT
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_PAUSE_STREAM
,
mndProcessPauseStreamReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_PAUSE_STREAM
,
mndProcessPauseStreamReq
);
...
@@ -173,6 +173,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
...
@@ -173,6 +173,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
if
(
sver
!=
MND_STREAM_VER_NUMBER
)
{
if
(
sver
!=
MND_STREAM_VER_NUMBER
)
{
terrno
=
0
;
terrno
=
0
;
mError
(
"stream read invalid ver, data ver: %d, curr ver: %d"
,
sver
,
MND_STREAM_VER_NUMBER
);
goto
STREAM_DECODE_OVER
;
goto
STREAM_DECODE_OVER
;
}
}
...
@@ -778,16 +779,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
...
@@ -778,16 +779,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
}
}
}
// pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
// pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
// if (pDb->cfg.replications != 1) {
// if (pDb->cfg.replications != 1) {
// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
// mndReleaseDb(pMnode, pDb);
// mndReleaseDb(pMnode, pDb);
// pDb = NULL;
// pDb = NULL;
// goto _OVER;
// goto _OVER;
// }
// }
// mndReleaseDb(pMnode, pDb);
// mndReleaseDb(pMnode, pDb);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_DB_INSIDE
,
pReq
,
"create-stream"
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_DB_INSIDE
,
pReq
,
"create-stream"
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
...
@@ -870,7 +871,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
...
@@ -870,7 +871,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SRpcMsg
rpcMsg
=
{
SRpcMsg
rpcMsg
=
{
.
msgType
=
TDMT_MND_STREAM_BEGIN_CHECKPOINT
,
.
pCont
=
pMsg
,
.
contLen
=
sizeof
(
SMStreamDoCheckpointMsg
)};
.
msgType
=
TDMT_MND_STREAM_BEGIN_CHECKPOINT
,
.
pCont
=
pMsg
,
.
contLen
=
sizeof
(
SMStreamDoCheckpointMsg
)};
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return
0
;
return
0
;
}
}
...
@@ -1051,7 +1052,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
...
@@ -1051,7 +1052,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
}
}
STransAction
action
=
{
0
};
STransAction
action
=
{
0
};
SEpSet
epset
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
SEpSet
epset
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
initTransAction
(
&
action
,
buf
,
tlen
,
TDMT_VND_STREAM_CHECK_POINT_SOURCE
,
&
epset
);
initTransAction
(
&
action
,
buf
,
tlen
,
TDMT_VND_STREAM_CHECK_POINT_SOURCE
,
&
epset
);
mndReleaseVgroup
(
pMnode
,
pVgObj
);
mndReleaseVgroup
(
pMnode
,
pVgObj
);
...
@@ -1168,6 +1169,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
...
@@ -1168,6 +1169,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
return
-
1
;
return
-
1
;
}
}
mInfo
(
"trans:%d, used to drop stream:%s"
,
pTrans
->
id
,
dropReq
.
name
);
mInfo
(
"trans:%d, used to drop stream:%s"
,
pTrans
->
id
,
dropReq
.
name
);
mndTransSetDbName
(
pTrans
,
pStream
->
sourceDb
,
pStream
->
targetDb
);
mndTransSetDbName
(
pTrans
,
pStream
->
sourceDb
,
pStream
->
targetDb
);
...
@@ -1176,6 +1178,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
...
@@ -1176,6 +1178,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
-
1
;
return
-
1
;
}
}
//mndTransSetSerial(pTrans);
// drop all tasks
// drop all tasks
if
(
mndDropStreamTasks
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
if
(
mndDropStreamTasks
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
...
@@ -1742,11 +1745,12 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
...
@@ -1742,11 +1745,12 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
}
typedef
struct
SVgroupChangeInfo
{
typedef
struct
SVgroupChangeInfo
{
SHashObj
*
pDBMap
;
SHashObj
*
pDBMap
;
SArray
*
pUpdateNodeList
;
//
SArray<SNodeUpdateInfo>
SArray
*
pUpdateNodeList
;
//
SArray<SNodeUpdateInfo>
}
SVgroupChangeInfo
;
}
SVgroupChangeInfo
;
static
void
initNodeUpdateMsg
(
SStreamTaskNodeUpdateMsg
*
pMsg
,
const
SVgroupChangeInfo
*
pInfo
,
int64_t
streamId
,
int32_t
taskId
)
{
static
void
initNodeUpdateMsg
(
SStreamTaskNodeUpdateMsg
*
pMsg
,
const
SVgroupChangeInfo
*
pInfo
,
int64_t
streamId
,
int32_t
taskId
)
{
pMsg
->
streamId
=
streamId
;
pMsg
->
streamId
=
streamId
;
pMsg
->
taskId
=
taskId
;
pMsg
->
taskId
=
taskId
;
pMsg
->
pNodeList
=
taosArrayInit
(
taosArrayGetSize
(
pInfo
->
pUpdateNodeList
),
sizeof
(
SNodeUpdateInfo
));
pMsg
->
pNodeList
=
taosArrayInit
(
taosArrayGetSize
(
pInfo
->
pUpdateNodeList
),
sizeof
(
SNodeUpdateInfo
));
...
@@ -1792,7 +1796,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
...
@@ -1792,7 +1796,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
mndPersistTransLog
(
SStreamObj
*
pStream
,
STrans
*
pTrans
)
{
int32_t
mndPersistTransLog
(
SStreamObj
*
pStream
,
STrans
*
pTrans
)
{
SSdbRaw
*
pCommitRaw
=
mndStreamActionEncode
(
pStream
);
SSdbRaw
*
pCommitRaw
=
mndStreamActionEncode
(
pStream
);
if
(
pCommitRaw
==
NULL
)
{
if
(
pCommitRaw
==
NULL
)
{
mError
(
"failed to encode stream since %s"
,
terrstr
());
mError
(
"failed to encode stream since %s"
,
terrstr
());
...
@@ -1816,7 +1820,7 @@ int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) {
...
@@ -1816,7 +1820,7 @@ int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) {
return
0
;
return
0
;
}
}
void
initTransAction
(
STransAction
*
pAction
,
void
*
pCont
,
int32_t
contLen
,
int32_t
msgType
,
const
SEpSet
*
pEpset
)
{
void
initTransAction
(
STransAction
*
pAction
,
void
*
pCont
,
int32_t
contLen
,
int32_t
msgType
,
const
SEpSet
*
pEpset
)
{
pAction
->
epSet
=
*
pEpset
;
pAction
->
epSet
=
*
pEpset
;
pAction
->
contLen
=
contLen
;
pAction
->
contLen
=
contLen
;
pAction
->
pCont
=
pCont
;
pAction
->
pCont
=
pCont
;
...
@@ -1825,7 +1829,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_
...
@@ -1825,7 +1829,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_
// todo extract method: traverse stream tasks
// todo extract method: traverse stream tasks
// build trans to update the epset
// build trans to update the epset
static
int32_t
createStreamUpdateTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
SVgroupChangeInfo
*
pInfo
)
{
static
int32_t
createStreamUpdateTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
SVgroupChangeInfo
*
pInfo
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_DB_INSIDE
,
NULL
,
"stream-task-update"
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_DB_INSIDE
,
NULL
,
"stream-task-update"
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
mError
(
"failed to build stream task DAG update, reason: %s"
,
tstrerror
(
TSDB_CODE_OUT_OF_MEMORY
));
mError
(
"failed to build stream task DAG update, reason: %s"
,
tstrerror
(
TSDB_CODE_OUT_OF_MEMORY
));
...
@@ -1889,11 +1893,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
...
@@ -1889,11 +1893,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
return
TSDB_CODE_ACTION_IN_PROGRESS
;
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
}
static
bool
isNodeEpsetChanged
(
const
SEpSet
*
pPrevEpset
,
const
SEpSet
*
pCurrent
)
{
static
bool
isNodeEpsetChanged
(
const
SEpSet
*
pPrevEpset
,
const
SEpSet
*
pCurrent
)
{
const
SEp
*
pEp
=
GET_ACTIVE_EP
(
pPrevEpset
);
const
SEp
*
pEp
=
GET_ACTIVE_EP
(
pPrevEpset
);
for
(
int32_t
i
=
0
;
i
<
pCurrent
->
numOfEps
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pCurrent
->
numOfEps
;
++
i
)
{
const
SEp
*
p
=
&
(
pCurrent
->
eps
[
i
]);
const
SEp
*
p
=
&
(
pCurrent
->
eps
[
i
]);
if
(
pEp
->
port
==
p
->
port
&&
strncmp
(
pEp
->
fqdn
,
p
->
fqdn
,
TSDB_FQDN_LEN
)
==
0
)
{
if
(
pEp
->
port
==
p
->
port
&&
strncmp
(
pEp
->
fqdn
,
p
->
fqdn
,
TSDB_FQDN_LEN
)
==
0
)
{
return
false
;
return
false
;
}
}
...
@@ -1948,12 +1952,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
...
@@ -1948,12 +1952,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
return
info
;
return
info
;
}
}
static
SArray
*
mndTakeVgroupSnapshot
(
SMnode
*
pMnode
)
{
static
SArray
*
mndTakeVgroupSnapshot
(
SMnode
*
pMnode
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
SArray
*
pVgroupListSnapshot
=
taosArrayInit
(
4
,
sizeof
(
SNodeEntry
));
SArray
*
pVgroupListSnapshot
=
taosArrayInit
(
4
,
sizeof
(
SNodeEntry
));
while
(
1
)
{
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
...
@@ -1973,7 +1977,7 @@ static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) {
...
@@ -1973,7 +1977,7 @@ static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) {
return
pVgroupListSnapshot
;
return
pVgroupListSnapshot
;
}
}
int32_t
mndProcessVgroupChange
(
SMnode
*
pMnode
,
SVgroupChangeInfo
*
pChangeInfo
)
{
int32_t
mndProcessVgroupChange
(
SMnode
*
pMnode
,
SVgroupChangeInfo
*
pChangeInfo
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
// check all streams that involved this vnode should update the epset info
// check all streams that involved this vnode should update the epset info
...
@@ -1985,14 +1989,14 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
...
@@ -1985,14 +1989,14 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
break
;
break
;
}
}
void
*
p
=
taosHashGet
(
pChangeInfo
->
pDBMap
,
pStream
->
targetDb
,
strlen
(
pStream
->
targetDb
));
void
*
p
=
taosHashGet
(
pChangeInfo
->
pDBMap
,
pStream
->
targetDb
,
strlen
(
pStream
->
targetDb
));
void
*
p1
=
taosHashGet
(
pChangeInfo
->
pDBMap
,
pStream
->
sourceDb
,
strlen
(
pStream
->
sourceDb
));
void
*
p1
=
taosHashGet
(
pChangeInfo
->
pDBMap
,
pStream
->
sourceDb
,
strlen
(
pStream
->
sourceDb
));
if
(
p
==
NULL
&&
p1
==
NULL
)
{
if
(
p
==
NULL
&&
p1
==
NULL
)
{
mndReleaseStream
(
pMnode
,
pStream
);
mndReleaseStream
(
pMnode
,
pStream
);
continue
;
continue
;
}
}
mDebug
(
"stream:0x%"
PRIx64
" involved node changed, create update trans"
,
pStream
->
uid
);
mDebug
(
"stream:0x%"
PRIx64
" involved node changed, create update trans"
,
pStream
->
uid
);
int32_t
code
=
createStreamUpdateTrans
(
pMnode
,
pStream
,
pChangeInfo
);
int32_t
code
=
createStreamUpdateTrans
(
pMnode
,
pStream
,
pChangeInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// todo
// todo
...
@@ -2002,12 +2006,12 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
...
@@ -2002,12 +2006,12 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
return
0
;
return
0
;
}
}
static
SArray
*
doExtractNodeListFromStream
(
SMnode
*
pMnode
)
{
static
SArray
*
doExtractNodeListFromStream
(
SMnode
*
pMnode
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SStreamObj
*
pStream
=
NULL
;
SStreamObj
*
pStream
=
NULL
;
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
SHashObj
*
pHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
SHashObj
*
pHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
while
(
1
)
{
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_STREAM
,
pIter
,
(
void
**
)
&
pStream
);
pIter
=
sdbFetch
(
pSdb
,
SDB_STREAM
,
pIter
,
(
void
**
)
&
pStream
);
if
(
pIter
==
NULL
)
{
if
(
pIter
==
NULL
)
{
...
@@ -2023,7 +2027,7 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) {
...
@@ -2023,7 +2027,7 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) {
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
);
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
);
for
(
int32_t
k
=
0
;
k
<
numOfTasks
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
numOfTasks
;
++
k
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pLevel
,
k
);
SStreamTask
*
pTask
=
taosArrayGetP
(
pLevel
,
k
);
SNodeEntry
entry
=
{
0
};
SNodeEntry
entry
=
{
0
};
epsetAssign
(
&
entry
.
epset
,
&
pTask
->
info
.
epSet
);
epsetAssign
(
&
entry
.
epset
,
&
pTask
->
info
.
epSet
);
entry
.
nodeId
=
pTask
->
info
.
nodeId
;
entry
.
nodeId
=
pTask
->
info
.
nodeId
;
entry
.
hbTimestamp
=
-
1
;
entry
.
hbTimestamp
=
-
1
;
...
@@ -2033,14 +2037,15 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) {
...
@@ -2033,14 +2037,15 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) {
}
}
taosWUnLockLatch
(
&
pStream
->
lock
);
taosWUnLockLatch
(
&
pStream
->
lock
);
sdbRelease
(
pSdb
,
pStream
);
}
}
SArray
*
plist
=
taosArrayInit
(
taosHashGetSize
(
pHash
),
sizeof
(
SNodeEntry
));
SArray
*
plist
=
taosArrayInit
(
taosHashGetSize
(
pHash
),
sizeof
(
SNodeEntry
));
// convert to list
// convert to list
pIter
=
NULL
;
pIter
=
NULL
;
while
((
pIter
=
taosHashIterate
(
pHash
,
pIter
))
!=
NULL
)
{
while
((
pIter
=
taosHashIterate
(
pHash
,
pIter
))
!=
NULL
)
{
SNodeEntry
*
pEntry
=
(
SNodeEntry
*
)
pIter
;
SNodeEntry
*
pEntry
=
(
SNodeEntry
*
)
pIter
;
taosArrayPush
(
plist
,
pEntry
);
taosArrayPush
(
plist
,
pEntry
);
}
}
taosHashCleanup
(
pHash
);
taosHashCleanup
(
pHash
);
...
@@ -2094,7 +2099,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
...
@@ -2094,7 +2099,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
}
}
// todo: this process should be executed by the write queue worker of the mnode
// todo: this process should be executed by the write queue worker of the mnode
//int32_t mndProcessStreamHb(SRpcMsg *pReq) {
//
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// SMnode *pMnode = pReq->info.node;
// SMnode *pMnode = pReq->info.node;
// SSdb *pSdb = pMnode->pSdb;
// SSdb *pSdb = pMnode->pSdb;
// SStreamHbMsg req = {0};
// SStreamHbMsg req = {0};
...
...
source/dnode/mnode/sdb/src/sdbRaw.c
浏览文件 @
46574c38
...
@@ -46,7 +46,7 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
...
@@ -46,7 +46,7 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
void
sdbFreeRaw
(
SSdbRaw
*
pRaw
)
{
void
sdbFreeRaw
(
SSdbRaw
*
pRaw
)
{
if
(
pRaw
!=
NULL
)
{
if
(
pRaw
!=
NULL
)
{
#if 1
#if 1
mTrace
(
"raw:%p, is freed
"
,
pRaw
);
mTrace
(
"raw:%p, is freed
, len:%d, table:%s"
,
pRaw
,
pRaw
->
dataLen
,
sdbTableName
(
pRaw
->
type
)
);
#endif
#endif
taosMemoryFree
(
pRaw
);
taosMemoryFree
(
pRaw
);
}
}
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
46574c38
...
@@ -1412,7 +1412,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
...
@@ -1412,7 +1412,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
}
}
int
streamStateOpenBackend
(
void
*
backend
,
SStreamState
*
pState
)
{
int
streamStateOpenBackend
(
void
*
backend
,
SStreamState
*
pState
)
{
qInfo
(
"start to open state %p on backend %p 0x%"
PRIx64
"-%d"
,
pState
,
backend
,
pState
->
streamId
,
pState
->
taskId
);
qInfo
(
"start to open state %p on backend %p 0x%"
PRIx64
"-%d"
,
pState
,
backend
,
pState
->
streamId
,
pState
->
taskId
);
//
taosAcquireRef(streamBackendId, pState->streamBackendRid);
taosAcquireRef
(
streamBackendId
,
pState
->
streamBackendRid
);
SBackendWrapper
*
handle
=
backend
;
SBackendWrapper
*
handle
=
backend
;
SBackendCfWrapper
*
pBackendCfWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SBackendCfWrapper
));
SBackendCfWrapper
*
pBackendCfWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SBackendCfWrapper
));
...
@@ -1495,6 +1495,9 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
...
@@ -1495,6 +1495,9 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
void
streamStateCloseBackend
(
SStreamState
*
pState
,
bool
remove
)
{
void
streamStateCloseBackend
(
SStreamState
*
pState
,
bool
remove
)
{
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SBackendCfWrapper
*
wrapper
=
pState
->
pTdbState
->
pBackendCfWrapper
;
SBackendWrapper
*
pHandle
=
wrapper
->
pBackend
;
SBackendWrapper
*
pHandle
=
wrapper
->
pBackend
;
qInfo
(
"start to close state on backend: %p"
,
pHandle
);
taosThreadMutexLock
(
&
pHandle
->
cfMutex
);
taosThreadMutexLock
(
&
pHandle
->
cfMutex
);
RocksdbCfInst
**
ppInst
=
taosHashGet
(
pHandle
->
cfInst
,
wrapper
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
RocksdbCfInst
**
ppInst
=
taosHashGet
(
pHandle
->
cfInst
,
wrapper
->
idstr
,
strlen
(
pState
->
pTdbState
->
idstr
)
+
1
);
if
(
ppInst
!=
NULL
&&
*
ppInst
!=
NULL
)
{
if
(
ppInst
!=
NULL
&&
*
ppInst
!=
NULL
)
{
...
@@ -1505,7 +1508,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
...
@@ -1505,7 +1508,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
taosThreadMutexUnlock
(
&
pHandle
->
cfMutex
);
taosThreadMutexUnlock
(
&
pHandle
->
cfMutex
);
char
*
status
[]
=
{
"close"
,
"drop"
};
char
*
status
[]
=
{
"close"
,
"drop"
};
qInfo
(
"start to
close
%s state %p on backendWrapper %p %s"
,
status
[
remove
==
false
?
0
:
1
],
pState
,
wrapper
,
qInfo
(
"start to %s state %p on backendWrapper %p %s"
,
status
[
remove
==
false
?
0
:
1
],
pState
,
wrapper
,
wrapper
->
idstr
);
wrapper
->
idstr
);
wrapper
->
remove
|=
remove
;
// update by other pState
wrapper
->
remove
|=
remove
;
// update by other pState
taosReleaseRef
(
streamBackendCfWrapperId
,
pState
->
pTdbState
->
backendCfWrapperId
);
taosReleaseRef
(
streamBackendCfWrapperId
,
pState
->
pTdbState
->
backendCfWrapperId
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录