Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5401c687
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
5401c687
编写于
6月 18, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: add trace
上级
09df29e1
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
176 addition
and
119 deletion
+176
-119
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+7
-4
include/util/ttrace.h
include/util/ttrace.h
+22
-7
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+3
-5
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+2
-0
source/libs/transport/inc/transLog.h
source/libs/transport/inc/transLog.h
+5
-0
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+5
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+50
-36
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+3
-2
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+21
-18
source/util/src/ttrace.c
source/util/src/ttrace.c
+58
-47
未找到文件。
include/libs/transport/trpc.h
浏览文件 @
5401c687
...
...
@@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h>
#include "taosdef.h"
#include "tmsg.h"
#include "ttrace.h"
#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
...
...
@@ -41,10 +42,12 @@ typedef struct {
typedef
struct
SRpcHandleInfo
{
// rpc info
void
*
handle
;
// rpc handle returned to app
int64_t
refId
;
// refid, used by server
int32_t
noResp
;
// has response or not(default 0, 0: resp, 1: no resp);
int32_t
persistHandle
;
// persist handle or not
void
*
handle
;
// rpc handle returned to app
int64_t
refId
;
// refid, used by server
int32_t
noResp
;
// has response or not(default 0, 0: resp, 1: no resp);
int32_t
persistHandle
;
// persist handle or not
STraceId
traceId
;
// int64_t traceId;
SRpcConnInfo
connInfo
;
// app info
...
...
include/util/ttrace.h
浏览文件 @
5401c687
...
...
@@ -21,18 +21,33 @@
extern
"C"
{
#endif
typedef
int64_t
STraceId
;
typedef
int32_t
STraceSubId
;
#pragma(push, 1)
typedef
struct
{
int64_t
rootId
;
int64_t
msgId
;
}
STraceId
;
STraceId
traceInitId
(
STraceSubId
*
h
,
STraceSubId
*
l
);
#pragma(pop)
void
traceId2Str
(
STraceId
*
id
,
char
*
buf
);
#define TRACE_SET_ROOTID(traceId, root) \
do { \
(traceId)->rootId = root; \
} while (0);
void
traceSetSubId
(
STraceId
*
id
,
int32_t
*
subId
);
#define TRACE_GET_ROOTID(traceId) (traceId)->rootId
STraceSubId
traceGetParentId
(
STraceId
*
id
);
#define TRACE_SET_MSGID(traceId, mId) \
do { \
(traceId)->msgId = mId; \
} while (0)
#define TRACE_GET_MSGID(traceId) (traceId)->msgId
#define TRACE_TO_STR(traceId, buf) \
do { \
sprintf(buf, "0x%" PRIx64 ": 0x%" PRIx64 "", traceId->rootId, traceId->msgId); \
} while (0)
STraceSubId
traceGenSubId
();
#ifdef __cplusplus
}
#endif
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
5401c687
...
...
@@ -154,7 +154,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
.
info
.
persistHandle
=
persistHandle
,
.
code
=
0
};
assert
(
pInfo
->
fp
!=
NULL
);
TRACE_SET_ROOTID
(
&
rpcMsg
.
info
.
traceId
,
pInfo
->
requestId
);
rpcSendRequestWithCtx
(
pTransporter
,
epSet
,
&
rpcMsg
,
pTransporterId
,
rpcCtx
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -208,14 +208,14 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
switch
(
pRes
->
msgType
)
{
case
TDMT_VND_ALTER_TABLE
:
case
TDMT_MND_ALTER_STB
:
{
tFreeSTableMetaRsp
((
STableMetaRsp
*
)
pRes
->
res
);
tFreeSTableMetaRsp
((
STableMetaRsp
*
)
pRes
->
res
);
taosMemoryFreeClear
(
pRes
->
res
);
break
;
}
case
TDMT_VND_SUBMIT
:
{
tFreeSSubmitRsp
((
SSubmitRsp
*
)
pRes
->
res
);
break
;
}
}
case
TDMT_VND_QUERY
:
{
taosArrayDestroy
((
SArray
*
)
pRes
->
res
);
break
;
...
...
@@ -224,5 +224,3 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
qError
(
"invalid exec result for request type %d"
,
pRes
->
msgType
);
}
}
source/libs/transport/inc/transComm.h
浏览文件 @
5401c687
...
...
@@ -26,6 +26,7 @@ extern "C" {
#include "transLog.h"
#include "transportInt.h"
#include "trpc.h"
#include "ttrace.h"
#include "tutil.h"
typedef
void
*
queue
[
2
];
...
...
@@ -140,6 +141,7 @@ typedef struct {
char
spi
:
2
;
char
user
[
TSDB_UNI_LEN
];
STraceId
traceId
;
uint64_t
ahandle
;
// ahandle assigned by client
uint32_t
code
;
// del later
uint32_t
msgType
;
...
...
source/libs/transport/inc/transLog.h
浏览文件 @
5401c687
...
...
@@ -22,6 +22,7 @@ extern "C" {
// clang-format off
#include "tlog.h"
#include "ttrace.h"
#define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0)
#define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0)
...
...
@@ -30,6 +31,10 @@ extern "C" {
#define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0)
//#define tTR(param, ...) do { char buf[40] = {0};TRACE_TO_STR(trace, buf);tTrace("TRID: %s "param, buf, __VA_ARGS__);} while(0)
#define tTR(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param " TRID: %s", __VA_ARGS__, buf);} while(0)
// clang-format on
#ifdef __cplusplus
}
...
...
source/libs/transport/src/trans.c
浏览文件 @
5401c687
...
...
@@ -163,6 +163,11 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
transSetDefaultAddr
(
thandle
,
ip
,
fqdn
);
}
// void rpcSetMsgTraceId(SRpcMsg* pMsg, STraceId uid) {
// SRpcHandleInfo* pInfo = &pMsg->info;
// pInfo->traceId = uid;
//}
int32_t
rpcInit
()
{
// impl later
return
0
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
5401c687
...
...
@@ -48,9 +48,10 @@ typedef struct SCliMsg {
STransConnCtx
*
ctx
;
STransMsg
msg
;
queue
q
;
uint64_t
st
;
STransMsgType
type
;
int
sent
;
//(0: no send, 1: alread sent)
uint64_t
st
;
int
sent
;
//(0: no send, 1: alread sent)
}
SCliMsg
;
typedef
struct
SCliThrdObj
{
...
...
@@ -167,10 +168,10 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
} while (0)
#define CONN_HOST_THREAD_I
NDE
X(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para)
(para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn)
(conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn)
(((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_HOST_THREAD_I
D
X(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
...
...
@@ -280,6 +281,7 @@ void cliHandleResp(SCliConn* conn) {
transMsg
.
code
=
pHead
->
code
;
transMsg
.
msgType
=
pHead
->
msgType
;
transMsg
.
info
.
ahandle
=
NULL
;
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
SCliMsg
*
pMsg
=
NULL
;
STransConnCtx
*
pCtx
=
NULL
;
...
...
@@ -324,18 +326,20 @@ void cliHandleResp(SCliConn* conn) {
transMsg
.
info
.
handle
=
conn
;
tDebug
(
"%s conn %p ref by app"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
}
tDebug
(
"%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d"
,
pTransInst
->
label
,
conn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
conn
->
addr
.
sin_addr
),
ntohs
(
conn
->
addr
.
sin_port
),
taosInetNtoa
(
conn
->
localAddr
.
sin_addr
),
ntohs
(
conn
->
localAddr
.
sin_port
),
transMsg
.
contLen
,
transMsg
.
code
);
// char buf[64] = {0};
// TRACE_TO_STR(&transMsg.info.traceId, buf);
STraceId
*
trace
=
&
transMsg
.
info
.
traceId
;
tTR
(
"conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, code: %d"
,
conn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
conn
->
addr
.
sin_addr
),
ntohs
(
conn
->
addr
.
sin_port
),
taosInetNtoa
(
conn
->
localAddr
.
sin_addr
),
ntohs
(
conn
->
localAddr
.
sin_port
),
transMsg
.
contLen
,
transMsg
.
code
);
if
(
pCtx
==
NULL
&&
CONN_NO_PERSIST_BY_APP
(
conn
))
{
t
Trace
(
"%s except, server continue send while cli ignore it"
,
CONN_GET_INST_LABEL
(
conn
));
t
Debug
(
"%s except, server continue send while cli ignore it"
,
CONN_GET_INST_LABEL
(
conn
));
// transUnrefCliHandle(conn);
return
;
}
if
(
CONN_RELEASE_BY_SERVER
(
conn
)
&&
transMsg
.
info
.
ahandle
==
NULL
)
{
t
Trace
(
"%s except, server continue send while cli ignore it"
,
CONN_GET_INST_LABEL
(
conn
));
t
Debug
(
"%s except, server continue send while cli ignore it"
,
CONN_GET_INST_LABEL
(
conn
));
// transUnrefCliHandle(conn);
return
;
}
...
...
@@ -639,11 +643,16 @@ void cliSend(SCliConn* pConn) {
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
pHead
->
release
=
REQUEST_RELEASE_HANDLE
(
pCliMsg
)
?
1
:
0
;
memcpy
(
pHead
->
user
,
pTransInst
->
user
,
strlen
(
pTransInst
->
user
));
pHead
->
traceId
=
pMsg
->
info
.
traceId
;
uv_buf_t
wb
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
tDebug
(
"%s conn %p %s is send to %s:%d, local info %s:%d"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
));
// char buf[64] = {0};
// TRACE_TO_STR(&pMsg->info.traceId, buf);
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
tTR
(
"conn %p %s is sent to %s:%d, local info %s:%d"
,
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
));
if
(
pHead
->
persist
==
1
)
{
CONN_SET_PERSIST_BY_APP
(
pConn
);
...
...
@@ -723,7 +732,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
if
(
conn
!=
NULL
)
{
tTrace
(
"%s conn %p get from conn pool"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
}
else
{
tTrace
(
"
not found conn in conn pool %p"
,
pThrd
->
pool
);
tTrace
(
"
%s not found conn in conn pool %p"
,
((
STrans
*
)
pThrd
->
pTransInst
)
->
label
,
pThrd
->
pool
);
}
}
return
conn
;
...
...
@@ -1007,17 +1016,18 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
}
STraceId
*
trace
=
&
pResp
->
info
.
traceId
;
if
(
pCtx
->
pSem
!=
NULL
)
{
tT
race
(
"%s conn %p(sync) handle resp"
,
pTransInst
->
label
,
pConn
);
tT
R
(
"conn %p(sync) handle resp"
,
pConn
);
if
(
pCtx
->
pRsp
==
NULL
)
{
tT
race
(
"%s conn %p(sync) failed to resp, ignore"
,
pTransInst
->
label
,
pConn
);
tT
R
(
"conn %p(sync) failed to resp, ignore"
,
pConn
);
}
else
{
memcpy
((
char
*
)
pCtx
->
pRsp
,
(
char
*
)
pResp
,
sizeof
(
*
pResp
));
}
tsem_post
(
pCtx
->
pSem
);
pCtx
->
pRsp
=
NULL
;
}
else
{
tT
race
(
"%s conn %p handle resp"
,
pTransInst
->
label
,
pConn
);
tT
R
(
"conn %p handle resp"
,
pConn
);
if
(
pResp
->
code
!=
0
||
pCtx
->
retryCount
==
0
||
transEpSetIsEqual
(
&
pCtx
->
epSet
,
&
pCtx
->
origEpSet
))
{
pTransInst
->
cfp
(
pTransInst
->
parent
,
pResp
,
NULL
);
}
else
{
...
...
@@ -1074,16 +1084,17 @@ void transReleaseCliHandle(void* handle) {
void
transSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
STransMsg
*
pReq
,
STransCtx
*
ctx
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
int
i
ndex
=
CONN_HOST_THREAD_INDE
X
((
SCliConn
*
)
pReq
->
info
.
handle
);
if
(
i
nde
x
==
-
1
)
{
i
nde
x
=
cliRBChoseIdx
(
pTransInst
);
int
i
dx
=
CONN_HOST_THREAD_ID
X
((
SCliConn
*
)
pReq
->
info
.
handle
);
if
(
i
d
x
==
-
1
)
{
i
d
x
=
cliRBChoseIdx
(
pTransInst
);
}
TRACE_SET_MSGID
(
&
pReq
->
info
.
traceId
,
tGenIdPI64
());
STransConnCtx
*
pCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
STransConnCtx
));
pCtx
->
epSet
=
*
pEpSet
;
pCtx
->
ahandle
=
pReq
->
info
.
ahandle
;
pCtx
->
msgType
=
pReq
->
msgType
;
pCtx
->
hThrdIdx
=
i
nde
x
;
pCtx
->
hThrdIdx
=
i
d
x
;
if
(
ctx
!=
NULL
)
{
pCtx
->
appCtx
=
*
ctx
;
...
...
@@ -1096,28 +1107,30 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
cliMsg
->
st
=
taosGetTimestampUs
();
cliMsg
->
type
=
Normal
;
SCliThrdObj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
i
nde
x
];
SCliThrdObj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
i
d
x
];
tDebug
(
"%s send request at thread:%d, threadID: %08"
PRId64
", msg: %p, dst: %s:%d, app:%p"
,
pTransInst
->
label
,
index
,
thrd
->
pid
,
pReq
,
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
)
,
pReq
->
info
.
ahandle
);
STraceId
*
trace
=
&
pReq
->
info
.
traceId
;
tTR
(
"%s send request at thread:%08"
PRId64
", dst: %s:%d, app:%p"
,
pTransInst
->
label
,
thrd
->
pid
,
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
),
pReq
->
info
.
ahandle
);
ASSERT
(
transSendAsync
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
))
==
0
);
}
void
transSendRecv
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
STransMsg
*
pReq
,
STransMsg
*
pRsp
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
int
i
ndex
=
CONN_HOST_THREAD_INDE
X
(
pReq
->
info
.
handle
);
if
(
i
nde
x
==
-
1
)
{
i
nde
x
=
cliRBChoseIdx
(
pTransInst
);
int
i
dx
=
CONN_HOST_THREAD_ID
X
(
pReq
->
info
.
handle
);
if
(
i
d
x
==
-
1
)
{
i
d
x
=
cliRBChoseIdx
(
pTransInst
);
}
tsem_t
*
sem
=
taosMemoryCalloc
(
1
,
sizeof
(
tsem_t
));
tsem_init
(
sem
,
0
,
0
);
TRACE_SET_MSGID
(
&
pReq
->
info
.
traceId
,
tGenIdPI64
());
STransConnCtx
*
pCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
STransConnCtx
));
pCtx
->
epSet
=
*
pEpSet
;
pCtx
->
ahandle
=
pReq
->
info
.
ahandle
;
pCtx
->
msgType
=
pReq
->
msgType
;
pCtx
->
hThrdIdx
=
i
nde
x
;
pCtx
->
hThrdIdx
=
i
d
x
;
pCtx
->
pSem
=
sem
;
pCtx
->
pRsp
=
pRsp
;
...
...
@@ -1127,16 +1140,17 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
cliMsg
->
st
=
taosGetTimestampUs
();
cliMsg
->
type
=
Normal
;
SCliThrdObj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
index
];
tDebug
(
"%s send request at thread:%d, threadID:%08"
PRId64
", msg: %p, dst: %s:%d, app:%p"
,
pTransInst
->
label
,
index
,
thrd
->
pid
,
pReq
,
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
),
pReq
->
info
.
ahandle
);
SCliThrdObj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
idx
];
STraceId
*
trace
=
&
pReq
->
info
.
traceId
;
tTR
(
"%s send request at thread:%08"
PRId64
", dst: %s:%d, app:%p"
,
pTransInst
->
label
,
thrd
->
pid
,
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
),
pReq
->
info
.
ahandle
);
transSendAsync
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
));
tsem_wait
(
sem
);
tsem_destroy
(
sem
);
taosMemoryFree
(
sem
);
}
/*
*
**/
...
...
@@ -1159,7 +1173,7 @@ void transSetDefaultAddr(void* ahandle, const char* ip, const char* fqdn) {
cliMsg
->
type
=
Update
;
SCliThrdObj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
i
];
tDebug
(
"%s update epset at thread:%
d, threadID:%08"
PRId64
""
,
pTransInst
->
label
,
i
,
thrd
->
pid
);
tDebug
(
"%s update epset at thread:%
08"
PRId64
""
,
pTransInst
->
label
,
thrd
->
pid
);
transSendAsync
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
));
}
...
...
source/libs/transport/src/transComm.c
浏览文件 @
5401c687
...
...
@@ -376,17 +376,18 @@ static void transDQTimeout(uv_timer_t* timer) {
SDelayQueue
*
queue
=
timer
->
data
;
tTrace
(
"timer %p timeout"
,
timer
);
uint64_t
timeout
=
0
;
int64_t
current
=
taosGetTimestampMs
();
do
{
HeapNode
*
minNode
=
heapMin
(
queue
->
heap
);
if
(
minNode
==
NULL
)
break
;
SDelayTask
*
task
=
container_of
(
minNode
,
SDelayTask
,
node
);
if
(
task
->
execTime
<=
taosGetTimestampMs
()
)
{
if
(
task
->
execTime
<=
current
)
{
heapRemove
(
queue
->
heap
,
minNode
);
task
->
func
(
task
->
arg
);
taosMemoryFree
(
task
);
timeout
=
0
;
}
else
{
timeout
=
task
->
execTime
-
taosGetTimestampMs
()
;
timeout
=
task
->
execTime
-
current
;
break
;
}
}
while
(
1
);
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
5401c687
...
...
@@ -270,11 +270,8 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg
.
msgType
=
pHead
->
msgType
;
transMsg
.
code
=
pHead
->
code
;
transMsg
.
info
.
ahandle
=
(
void
*
)
pHead
->
ahandle
;
transMsg
.
info
.
handle
=
NULL
;
// transDestroyBuffer(&pConn->readBuf);
transClearBuffer
(
&
pConn
->
readBuf
);
pConn
->
inType
=
pHead
->
msgType
;
if
(
pConn
->
status
==
ConnNormal
)
{
if
(
pHead
->
persist
==
1
)
{
...
...
@@ -283,16 +280,18 @@ static void uvHandleReq(SSvrConn* pConn) {
tDebug
(
"conn %p acquired by server app"
,
pConn
);
}
}
STraceId
*
trace
=
&
pHead
->
traceId
;
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
transRefSrvHandle
(
pConn
);
tDebug
(
"conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
);
tTR
(
"conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
);
}
else
{
t
Debug
(
"conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d"
,
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
,
pHead
->
noResp
,
transMsg
.
code
);
t
TR
(
"conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d"
,
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
,
pHead
->
noResp
,
transMsg
.
code
);
// no ref here
}
...
...
@@ -300,11 +299,14 @@ static void uvHandleReq(SSvrConn* pConn) {
// 1. server application should not send resp on handle
// 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg
.
info
.
ahandle
=
(
void
*
)
pHead
->
ahandle
;
transMsg
.
info
.
handle
=
(
void
*
)
transAcquireExHandle
(
refMgt
,
pConn
->
refId
);
transMsg
.
info
.
refId
=
pConn
->
refId
;
tTrace
(
"handle %p conn: %p translated to app, refId: %"
PRIu64
""
,
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
tTR
(
"handle %p conn: %p translated to app, refId: %"
PRIu64
""
,
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
assert
(
transMsg
.
info
.
handle
!=
NULL
);
if
(
pHead
->
noResp
==
1
)
{
transMsg
.
info
.
refId
=
-
1
;
}
...
...
@@ -424,8 +426,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
}
static
void
uvPrepareSendData
(
SSvrMsg
*
smsg
,
uv_buf_t
*
wb
)
{
tTrace
(
"conn %p prepare to send resp"
,
smsg
->
pConn
);
SSvrConn
*
pConn
=
smsg
->
pConn
;
STransMsg
*
pMsg
=
&
smsg
->
msg
;
if
(
pMsg
->
pCont
==
0
)
{
...
...
@@ -434,6 +434,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
}
STransMsgHead
*
pHead
=
transHeadFromCont
(
pMsg
->
pCont
);
pHead
->
ahandle
=
(
uint64_t
)
pMsg
->
info
.
ahandle
;
pHead
->
traceId
=
pMsg
->
info
.
traceId
;
if
(
pConn
->
status
==
ConnNormal
)
{
pHead
->
msgType
=
pConn
->
inType
+
1
;
...
...
@@ -454,9 +455,11 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
char
*
msg
=
(
char
*
)
pHead
;
int32_t
len
=
transMsgLenFromCont
(
pMsg
->
contLen
);
tDebug
(
"conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d"
,
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
len
);
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
tTR
(
"conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d"
,
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
len
);
pHead
->
msgLen
=
htonl
(
len
);
wb
->
base
=
msg
;
...
...
source/util/src/ttrace.c
浏览文件 @
5401c687
...
...
@@ -18,51 +18,62 @@
#include "tuuid.h"
// clang-format off
static
TdThreadOnce
init
=
PTHREAD_ONCE_INIT
;
static
void
*
ids
=
NULL
;
static
TdThreadMutex
mtx
;
void
traceInit
()
{
ids
=
taosHashInit
(
4096
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
taosThreadMutexInit
(
&
mtx
,
NULL
);
}
void
traceCreateEnv
()
{
taosThreadOnce
(
&
init
,
traceInit
);
}
void
traceDestroyEnv
()
{
taosThreadMutexDestroy
(
&
mtx
);
taosHashCleanup
(
ids
);
}
STraceId
traceInitId
(
STraceSubId
*
h
,
STraceSubId
*
l
)
{
STraceId
id
=
*
h
;
id
=
((
id
<<
32
)
&
0xFFFFFFFF
)
|
((
*
l
)
&
0xFFFFFFFF
);
return
id
;
}
void
traceId2Str
(
STraceId
*
id
,
char
*
buf
)
{
int32_t
f
=
(
*
id
>>
32
)
&
0xFFFFFFFF
;
int32_t
s
=
(
*
id
)
&
0xFFFFFFFF
;
sprintf
(
buf
,
"%d:%d"
,
f
,
s
);
}
void
traceSetSubId
(
STraceId
*
id
,
STraceSubId
*
subId
)
{
int32_t
parent
=
((
*
id
>>
32
)
&
0xFFFFFFFF
);
taosThreadMutexLock
(
&
mtx
);
taosHashPut
(
ids
,
subId
,
sizeof
(
*
subId
),
&
parent
,
sizeof
(
parent
));
taosThreadMutexUnlock
(
&
mtx
);
}
STraceSubId
traceGetParentId
(
STraceId
*
id
)
{
int32_t
parent
=
((
*
id
>>
32
)
&
0xFFFFFFFF
);
taosThreadMutexLock
(
&
mtx
);
STraceSubId
*
p
=
taosHashGet
(
ids
,
(
void
*
)
&
parent
,
sizeof
(
parent
));
parent
=
*
p
;
taosThreadMutexUnlock
(
&
mtx
);
return
parent
;
}
STraceSubId
traceGenSubId
()
{
return
tGenIdPI32
();
}
//static TdThreadOnce init = PTHREAD_ONCE_INIT;
//static void * ids = NULL;
//static TdThreadMutex mtx;
//
//void traceInit() {
// ids = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
// taosThreadMutexInit(&mtx, NULL);
//}
//void traceCreateEnv() {
// taosThreadOnce(&init, traceInit);
//}
//void traceDestroyEnv() {
// taosThreadMutexDestroy(&mtx);
// taosHashCleanup(ids);
//}
//
//STraceId traceInitId(STraceSubId *h, STraceSubId *l) {
// STraceId id = *h;
// id = ((id << 32) & 0xFFFFFFFF) | ((*l) & 0xFFFFFFFF);
// return id;
//}
//void traceId2Str(STraceId *id, char *buf) {
// int32_t f = (*id >> 32) & 0xFFFFFFFF;
// int32_t s = (*id) & 0xFFFFFFFF;
// sprintf(buf, "%d:%d", f, s);
//}
//
//void traceSetSubId(STraceId *id, STraceSubId *subId) {
// int32_t parent = ((*id >> 32) & 0xFFFFFFFF);
// taosThreadMutexLock(&mtx);
// taosHashPut(ids, subId, sizeof(*subId), &parent, sizeof(parent));
// taosThreadMutexUnlock(&mtx);
//}
//
//STraceSubId traceGetParentId(STraceId *id) {
// int32_t parent = ((*id >> 32) & 0xFFFFFFFF);
// taosThreadMutexLock(&mtx);
// STraceSubId *p = taosHashGet(ids, (void *)&parent, sizeof(parent));
// parent = *p;
// taosThreadMutexUnlock(&mtx);
//
// return parent;
//}
//
//STraceSubId traceGenSubId() {
// return tGenIdPI32();
//}
//void traceSetRootId(STraceId *traceid, int64_t rootId) {
// traceId->rootId = rootId;
//}
//int64_t traceGetRootId(STraceId *traceId);
//
//void traceSetMsgId(STraceId *traceid, int64_t msgId);
//int64_t traceGetMsgId(STraceId *traceid);
//
//char *trace2Str(STraceId *id);
//
//void traceSetSubId(STraceId *id, int32_t *subId);
// clang-format on
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录