Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f3e8c65b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f3e8c65b
编写于
3月 23, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/scheduler' of github.com:taosdata/TDengine into feature/scheduler
上级
c80da5f7
60de1975
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
65 addition
and
41 deletion
+65
-41
source/dnode/mgmt/container/src/dndInt.c
source/dnode/mgmt/container/src/dndInt.c
+2
-1
source/dnode/mgmt/dnode/src/dmWorker.c
source/dnode/mgmt/dnode/src/dmWorker.c
+2
-2
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+1
-1
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+40
-17
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+20
-20
未找到文件。
source/dnode/mgmt/container/src/dndInt.c
浏览文件 @
f3e8c65b
...
...
@@ -135,6 +135,7 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug
(
"startup req is sent, step:%s desc:%s finished:%d"
,
pStartup
->
name
,
pStartup
->
desc
,
pStartup
->
finished
);
SRpcMsg
rpcRsp
=
{.
handle
=
pReq
->
handle
,
.
pCont
=
pStartup
,
.
contLen
=
sizeof
(
SStartupReq
),
.
ahandle
=
NULL
};
SRpcMsg
rpcRsp
=
{
.
handle
=
pReq
->
handle
,
.
pCont
=
pStartup
,
.
contLen
=
sizeof
(
SStartupReq
),
.
ahandle
=
pReq
->
ahandle
};
rpcSendResponse
(
&
rpcRsp
);
}
source/dnode/mgmt/dnode/src/dmWorker.c
浏览文件 @
f3e8c65b
...
...
@@ -23,7 +23,7 @@
static
void
*
dmThreadRoutine
(
void
*
param
)
{
SDnodeMgmt
*
pMgmt
=
param
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
int64_t
lastStatusTime
=
taosGetTimestampMs
();
int64_t
lastMonitorTime
=
lastStatusTime
;
...
...
@@ -55,7 +55,7 @@ static void *dmThreadRoutine(void *param) {
static
void
dmProcessQueue
(
SQueueInfo
*
pInfo
,
SNodeMsg
*
pMsg
)
{
SDnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
int32_t
code
=
-
1
;
dTrace
(
"msg:%p, will be processed in dnode queue"
,
pMsg
);
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
f3e8c65b
...
...
@@ -183,7 +183,7 @@ typedef struct {
#pragma pack(pop)
typedef
enum
{
Normal
,
Quit
,
Release
,
Register
}
STransMsgType
;
typedef
enum
{
ConnNormal
,
ConnAcquire
,
ConnRelease
,
ConnBroken
}
ConnStatus
;
typedef
enum
{
ConnNormal
,
ConnAcquire
,
ConnRelease
,
ConnBroken
,
ConnInPool
}
ConnStatus
;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
...
...
source/libs/transport/src/transCli.c
浏览文件 @
f3e8c65b
...
...
@@ -53,6 +53,7 @@ typedef struct SCliMsg {
queue
q
;
uint64_t
st
;
STransMsgType
type
;
int
sent
;
//(0: no send, 1: alread sent)
}
SCliMsg
;
typedef
struct
SCliThrdObj
{
...
...
@@ -135,6 +136,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
...
...
@@ -146,6 +149,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
SCliThrdObj* thrd = conn->hostThrd; \
addConnToPool(thrd->pool, conn); \
} \
destroyCmsg(pMsg); \
return; \
} \
} while (0)
...
...
@@ -198,8 +202,18 @@ static void* cliWorkThread(void* arg);
bool
cliMaySendCachedMsg
(
SCliConn
*
conn
)
{
if
(
!
transQueueEmpty
(
&
conn
->
cliMsgs
))
{
SCliMsg
*
pCliMsg
=
NULL
;
int
i
=
0
;
do
{
pCliMsg
=
transQueueGet
(
&
conn
->
cliMsgs
,
i
++
);
if
(
pCliMsg
&&
0
==
pCliMsg
->
sent
)
{
break
;
}
}
while
(
pCliMsg
!=
NULL
);
if
(
pCliMsg
==
NULL
)
{
return
false
;
}
cliSend
(
conn
);
return
true
;
}
return
false
;
}
...
...
@@ -218,33 +232,27 @@ void cliHandleResp(SCliConn* conn) {
transMsg
.
msgType
=
pHead
->
msgType
;
transMsg
.
ahandle
=
NULL
;
CONN_SHOULD_RELEASE
(
conn
,
pHead
);
SCliMsg
*
pMsg
=
NULL
;
STransConnCtx
*
pCtx
=
NULL
;
CONN_SHOULD_RELEASE
(
conn
,
pHead
);
if
(
CONN_NO_PERSIST_BY_APP
(
conn
))
{
pMsg
=
transQueuePop
(
&
conn
->
cliMsgs
);
/// uint64_t ahandle = (uint64_t)pHead->ahandle;
// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
pCtx
=
pMsg
?
pMsg
->
ctx
:
NULL
;
if
(
pMsg
==
NULL
&&
!
CONN_NO_PERSIST_BY_APP
(
conn
))
{
transMsg
.
ahandle
=
transCtxDumpVal
(
&
conn
->
ctx
,
transMsg
.
msgType
);
if
(
transMsg
.
ahandle
==
NULL
)
{
transMsg
.
ahandle
=
transCtxDumpBrokenlinkVal
(
&
conn
->
ctx
,
(
int32_t
*
)
&
(
transMsg
.
msgType
));
}
tDebug
(
"cli conn %p construct ahandle %p, persist: 0"
,
conn
,
transMsg
.
ahandle
);
}
else
{
transMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
tDebug
(
"cli conn %p get ahandle %p, persist: 0"
,
conn
,
transMsg
.
ahandle
);
}
transMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
tDebug
(
"cli conn %p get ahandle %p, persist: 0"
,
conn
,
transMsg
.
ahandle
);
}
else
{
uint64_t
ahandle
=
(
uint64_t
)
pHead
->
ahandle
;
CONN_GET_MSGCTX_BY_AHANDLE
(
conn
,
ahandle
);
if
(
pMsg
==
NULL
)
{
transMsg
.
ahandle
=
transCtxDumpVal
(
&
conn
->
ctx
,
transMsg
.
msgType
);
tDebug
(
"cli conn %p construct ahandle %p by %d, persist: 1"
,
conn
,
transMsg
.
ahandle
,
transMsg
.
msgType
);
if
(
transMsg
.
ahandle
==
NULL
)
{
tDebug
(
"cli conn %p construct ahandle %p due brokenlink, persist: 1"
,
conn
,
transMsg
.
ahandle
);
transMsg
.
ahandle
=
transCtxDumpBrokenlinkVal
(
&
conn
->
ctx
,
(
int32_t
*
)
&
(
transMsg
.
msgType
));
}
tDebug
(
"cli conn %p construct ahandle %p, persist: 1"
,
conn
,
transMsg
.
ahandle
);
}
else
{
pCtx
=
pMsg
?
pMsg
->
ctx
:
NULL
;
transMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
...
...
@@ -419,7 +427,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn
->
expireTime
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pTransInst
->
idleTime
);
transCtxCleanup
(
&
conn
->
ctx
);
transQueueClear
(
&
conn
->
cliMsgs
);
conn
->
status
=
Conn
Norma
l
;
conn
->
status
=
Conn
InPoo
l
;
char
key
[
128
]
=
{
0
};
tstrncpy
(
key
,
conn
->
ip
,
strlen
(
conn
->
ip
));
...
...
@@ -546,7 +554,21 @@ void cliSend(SCliConn* pConn) {
// assert(taosArrayGetSize(pConn->cliMsgs) > 0);
assert
(
!
transQueueEmpty
(
&
pConn
->
cliMsgs
));
SCliMsg
*
pCliMsg
=
transQueueGet
(
&
pConn
->
cliMsgs
,
0
);
SCliMsg
*
pCliMsg
=
NULL
;
int
i
=
0
;
do
{
pCliMsg
=
transQueueGet
(
&
pConn
->
cliMsgs
,
i
++
);
if
(
pCliMsg
&&
0
==
pCliMsg
->
sent
)
{
break
;
}
}
while
(
pCliMsg
!=
NULL
);
if
(
pCliMsg
==
NULL
)
{
return
;
}
pCliMsg
->
sent
=
1
;
STransConnCtx
*
pCtx
=
pCliMsg
->
ctx
;
SCliThrdObj
*
pThrd
=
pConn
->
hostThrd
;
...
...
@@ -558,7 +580,7 @@ void cliSend(SCliConn* pConn) {
pMsg
->
contLen
=
0
;
}
STransMsgHead
*
pHead
=
transHeadFromCont
(
pMsg
->
pCont
);
pHead
->
ahandle
=
(
uint64_t
)
pCtx
->
ahandle
;
pHead
->
ahandle
=
pCtx
!=
NULL
?
(
uint64_t
)
pCtx
->
ahandle
:
0
;
int
msgLen
=
transMsgLenFromCont
(
pMsg
->
contLen
);
...
...
@@ -868,6 +890,7 @@ void transReleaseCliHandle(void* handle) {
STransMsg
tmsg
=
{.
handle
=
handle
};
SCliMsg
*
cmsg
=
calloc
(
1
,
sizeof
(
SCliMsg
));
cmsg
->
msg
=
tmsg
;
cmsg
->
type
=
Release
;
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
f3e8c65b
...
...
@@ -93,25 +93,25 @@ typedef struct SServerObj {
static
const
char
*
notify
=
"a"
;
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.
handle = (void*)conn, .code = 0};
\
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
#define CONN_SHOULD_RELEASE(conn, head)
\
do {
\
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {
\
conn->status = ConnRelease;
\
transClearBuffer(&conn->readBuf);
\
transFreeMsg(transContFromHead((char*)head));
\
tTrace("server conn %p received release request", conn);
\
\
STransMsg tmsg = {.
code = 0, .handle = (void*)conn, .ahandle = NULL};
\
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
\
srvMsg->msg = tmsg;
\
srvMsg->type = Release;
\
srvMsg->pConn = conn;
\
if (!transQueuePush(&conn->srvMsgs, srvMsg)) {
\
return;
\
}
\
uvStartSendRespInternal(srvMsg);
\
return;
\
}
\
} while (0)
static
void
uvAllocConnBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
...
...
@@ -823,7 +823,7 @@ void transReleaseSrvHandle(void* handle) {
SSrvConn
*
pConn
=
handle
;
SWorkThrdObj
*
pThrd
=
pConn
->
hostThrd
;
STransMsg
tmsg
=
{.
handle
=
handle
,
.
code
=
0
};
STransMsg
tmsg
=
{.
code
=
0
,
.
handle
=
handle
,
.
ahandle
=
NULL
};
SSrvMsg
*
srvMsg
=
calloc
(
1
,
sizeof
(
SSrvMsg
));
srvMsg
->
msg
=
tmsg
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录