Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dca93b18
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dca93b18
编写于
7月 08, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: adjust logs
上级
389a4fba
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
91 addition
and
91 deletion
+91
-91
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+1
-1
source/dnode/mgmt/node_mgmt/src/dmNodes.c
source/dnode/mgmt/node_mgmt/src/dmNodes.c
+1
-1
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+1
-1
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+3
-3
source/libs/function/inc/fnLog.h
source/libs/function/inc/fnLog.h
+1
-1
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+6
-6
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+1
-1
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+24
-24
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+3
-3
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+18
-18
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+3
-3
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+16
-16
source/libs/transport/test/pushServer.c
source/libs/transport/test/pushServer.c
+1
-1
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+11
-11
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+1
-1
未找到文件。
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
浏览文件 @
dca93b18
...
...
@@ -21,7 +21,7 @@ extern SConfig *tsCfg;
static
void
dmUpdateDnodeCfg
(
SDnodeMgmt
*
pMgmt
,
SDnodeCfg
*
pCfg
)
{
if
(
pMgmt
->
pData
->
dnodeId
==
0
||
pMgmt
->
pData
->
clusterId
==
0
)
{
dInfo
(
"set dnodeId:%d clusterId:%"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
dInfo
(
"set
local info,
dnodeId:%d clusterId:%"
PRId64
,
pCfg
->
dnodeId
,
pCfg
->
clusterId
);
taosThreadRwlockWrlock
(
&
pMgmt
->
pData
->
lock
);
pMgmt
->
pData
->
dnodeId
=
pCfg
->
dnodeId
;
pMgmt
->
pData
->
clusterId
=
pCfg
->
clusterId
;
...
...
source/dnode/mgmt/node_mgmt/src/dmNodes.c
浏览文件 @
dca93b18
...
...
@@ -277,7 +277,7 @@ int32_t dmRunDnode(SDnode *pDnode) {
while
(
1
)
{
if
(
pDnode
->
stop
)
{
dInfo
(
"
dnod
e is about to stop"
);
dInfo
(
"
TDengin
e is about to stop"
);
dmSetStatus
(
pDnode
,
DND_STAT_STOPPED
);
dmStopNodes
(
pDnode
);
dmCloseNodes
(
pDnode
);
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
dca93b18
...
...
@@ -406,7 +406,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
if
(
statusReq
.
dnodeId
==
0
)
{
mInfo
(
"dnode:%d, %s first access,
set clusterId
%"
PRId64
,
pDnode
->
id
,
pDnode
->
ep
,
pMnode
->
clusterId
);
mInfo
(
"dnode:%d, %s first access,
clusterId:
%"
PRId64
,
pDnode
->
id
,
pDnode
->
ep
,
pMnode
->
clusterId
);
}
else
{
if
(
statusReq
.
clusterId
!=
pMnode
->
clusterId
)
{
if
(
pDnode
!=
NULL
)
{
...
...
source/libs/executor/src/executil.c
浏览文件 @
dca93b18
...
...
@@ -323,12 +323,12 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
if
(
code
==
TSDB_CODE_INDEX_REBUILDING
)
{
code
=
vnodeGetAllTableList
(
pVnode
,
tableUid
,
pListInfo
->
pTableList
);
}
else
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed
to get tableIds, reason: %s, suid: %"
PRIu64
""
,
tstrerror
(
code
),
tableUid
);
qError
(
"failed
to get tableIds, reason:%s, suid:%"
PRIu64
,
tstrerror
(
code
),
tableUid
);
taosArrayDestroy
(
res
);
terrno
=
code
;
return
code
;
}
else
{
qDebug
(
"suc
ess to get tableIds, size: %d, suid: %"
PRIu64
""
,
(
int
)
taosArrayGetSize
(
res
),
tableUid
);
qDebug
(
"suc
cess to get tableIds, size:%d, suid:%"
PRIu64
,
(
int
)
taosArrayGetSize
(
res
),
tableUid
);
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
res
);
i
++
)
{
...
...
@@ -341,7 +341,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed
to get tableIds, reason: %s, suid: %"
PRIu64
""
,
tstrerror
(
code
),
tableUid
);
qError
(
"failed
to get tableIds, reason:%s, suid:%"
PRIu64
,
tstrerror
(
code
),
tableUid
);
terrno
=
code
;
return
code
;
}
...
...
source/libs/function/inc/fnLog.h
浏览文件 @
dca93b18
...
...
@@ -14,7 +14,7 @@ extern "C" {
#define fnFatal(...) { if (udfDebugFlag & DEBUG_FATAL) { taosPrintLog("UDF FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define fnError(...) { if (udfDebugFlag & DEBUG_ERROR) { taosPrintLog("UDF ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define fnWarn(...) { if (udfDebugFlag & DEBUG_WARN) { taosPrintLog("UDF WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define fnInfo(...) { if (udfDebugFlag & DEBUG_INFO) { taosPrintLog("UDF
", DEBUG_INFO, 255, __VA_ARGS__); }}
#define fnInfo(...) { if (udfDebugFlag & DEBUG_INFO) { taosPrintLog("UDF ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define fnDebug(...) { if (udfDebugFlag & DEBUG_DEBUG) { taosPrintLog("UDF ", DEBUG_DEBUG, udfDebugFlag, __VA_ARGS__); }}
#define fnTrace(...) { if (udfDebugFlag & DEBUG_TRACE) { taosPrintLog("UDF ", DEBUG_TRACE, udfDebugFlag, __VA_ARGS__); }}
// clang-format on
...
...
source/libs/function/src/tudf.c
浏览文件 @
dca93b18
...
...
@@ -203,7 +203,7 @@ int32_t udfStartUdfd(int32_t startDnodeId) {
uv_async_send
(
&
pData
->
stopAsync
);
uv_thread_join
(
&
pData
->
thread
);
pData
->
needCleanUp
=
false
;
fnInfo
(
"
dnode udfd
cleaned up after spawn err"
);
fnInfo
(
"
udfd is
cleaned up after spawn err"
);
}
else
{
pData
->
needCleanUp
=
true
;
}
...
...
@@ -212,7 +212,7 @@ int32_t udfStartUdfd(int32_t startDnodeId) {
int32_t
udfStopUdfd
()
{
SUdfdData
*
pData
=
&
udfdGlobal
;
fnInfo
(
"
dnode to stop udfd. need cleanup: %d, spawn err:
%d"
,
fnInfo
(
"
udfd start to stop, need cleanup:%d, spawn err:
%d"
,
pData
->
needCleanUp
,
pData
->
spawnErr
);
if
(
!
pData
->
needCleanUp
||
atomic_load_32
(
&
pData
->
stopCalled
))
{
return
0
;
...
...
@@ -225,7 +225,7 @@ int32_t udfStopUdfd() {
#ifdef WINDOWS
if
(
pData
->
jobHandle
!=
NULL
)
CloseHandle
(
pData
->
jobHandle
);
#endif
fnInfo
(
"
dnode udfd
cleaned up"
);
fnInfo
(
"
udfd is
cleaned up"
);
return
0
;
}
...
...
@@ -467,7 +467,7 @@ int32_t getUdfdPipeName(char* pipeName, int32_t size) {
size_t
dnodeIdSize
=
sizeof
(
dnodeId
);
int32_t
err
=
uv_os_getenv
(
UDF_DNODE_ID_ENV_NAME
,
dnodeId
,
&
dnodeIdSize
);
if
(
err
!=
0
)
{
fnError
(
"
get dnode id from env. error: %s.
"
,
uv_err_name
(
err
));
fnError
(
"
failed to get dnodeId from env since %s
"
,
uv_err_name
(
err
));
dnodeId
[
0
]
=
'1'
;
}
#ifdef _WIN32
...
...
@@ -475,7 +475,7 @@ int32_t getUdfdPipeName(char* pipeName, int32_t size) {
#else
snprintf
(
pipeName
,
size
,
"%s/%s%s"
,
tsDataDir
,
UDF_LISTEN_PIPE_NAME_PREFIX
,
dnodeId
);
#endif
fnInfo
(
"get dnode
id from env. dnode id: %s. pipe path:
%s"
,
dnodeId
,
pipeName
);
fnInfo
(
"get dnode
Id:%s from env, pipe path:
%s"
,
dnodeId
,
pipeName
);
return
0
;
}
...
...
@@ -1609,7 +1609,7 @@ int32_t udfcClose() {
taosArrayDestroy
(
udfc
->
udfStubs
);
uv_mutex_destroy
(
&
udfc
->
udfStubsMutex
);
udfc
->
udfcState
=
UDFC_STATE_INITAL
;
fnInfo
(
"udfc cleaned up"
);
fnInfo
(
"udfc
is
cleaned up"
);
return
0
;
}
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
dca93b18
...
...
@@ -370,7 +370,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
init
=
1
;
nRead
=
tdbOsPRead
(
pPager
->
fd
,
pPage
->
pData
,
pPage
->
pageSize
,
((
i64
)
pPage
->
pageSize
)
*
(
pgno
-
1
));
tdbTrace
(
"tdbttl pager:%p, pgno:%d, nRead:%
ld"
,
pPager
,
pgno
,
nRead
);
tdbTrace
(
"tdbttl pager:%p, pgno:%d, nRead:%
"
PRId64
,
pPager
,
pgno
,
nRead
);
if
(
nRead
<
pPage
->
pageSize
)
{
ASSERT
(
0
);
return
-
1
;
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
dca93b18
...
...
@@ -249,30 +249,30 @@ int transAsyncSend(SAsyncPool* pool, queue* mq);
} \
} while (0)
#define ASYNC_CHECK_HANDLE(exh1, id)
\
do {
\
if (id > 0) {
\
tTrace("handle step1");
\
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id);
\
if (exh2 == NULL || id != exh2->refId) {
\
tTrace("handle %p except, may already freed, ignore msg, ref1:
%" PRIu64 ", ref2 : %" PRIu64 ""
, exh1, \
exh2 ? exh2->refId : 0, id);
\
goto _return1;
\
}
\
} else if (id == 0) {
\
tTrace("handle step2");
\
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id);
\
if (exh2 == NULL || id == exh2->refId) {
\
tTrace("handle %p except, may already freed, ignore msg, ref1:
%" PRIu64 ", ref2 : %" PRIu64 ""
, exh1, id, \
exh2 ? exh2->refId : 0);
\
goto _return1;
\
} else {
\
id = exh1->refId;
\
}
\
} else if (id < 0) {
\
tTrace("handle step3");
\
goto _return2;
\
}
\
#define ASYNC_CHECK_HANDLE(exh1, id) \
do { \
if (id > 0) { \
tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
if (exh2 == NULL || id != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1:
%" PRIu64 ", ref2:%" PRIu64
, exh1, \
exh2 ? exh2->refId : 0, id); \
goto _return1; \
} \
} else if (id == 0) { \
tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
if (exh2 == NULL || id == exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1:
%" PRIu64 ", ref2:%" PRIu64
, exh1, id, \
exh2 ? exh2->refId : 0); \
goto _return1; \
} else { \
id = exh1->refId; \
} \
} else if (id < 0) { \
tTrace("handle step3"); \
goto _return2; \
} \
} while (0)
int
transInitBuffer
(
SConnBuffer
*
buf
);
...
...
source/libs/transport/src/trans.c
浏览文件 @
dca93b18
...
...
@@ -58,7 +58,7 @@ void* rpcOpen(const SRpcInit* pInit) {
uint32_t
ip
=
0
;
if
(
pInit
->
connType
==
TAOS_CONN_SERVER
)
{
if
(
transValidLocalFqdn
(
pInit
->
localFqdn
,
&
ip
)
!=
0
)
{
tError
(
"invalid fqdn:
%s, errmsg:
%s"
,
pInit
->
localFqdn
,
terrstr
());
tError
(
"invalid fqdn:
%s, errmsg:
%s"
,
pInit
->
localFqdn
,
terrstr
());
taosMemoryFree
(
pRpc
);
return
NULL
;
}
...
...
@@ -86,7 +86,7 @@ void rpcClose(void* arg) {
tInfo
(
"start to close rpc"
);
transRemoveExHandle
(
transGetInstMgt
(),
(
int64_t
)
arg
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
arg
);
tInfo
(
"
finish to close rpc
"
);
tInfo
(
"
rpc is closed
"
);
return
;
}
void
rpcCloseImpl
(
void
*
arg
)
{
...
...
@@ -112,7 +112,7 @@ void* rpcMallocCont(int32_t contLen) {
void
rpcFreeCont
(
void
*
cont
)
{
if
(
cont
==
NULL
)
return
;
taosMemoryFree
((
char
*
)
cont
-
TRANS_MSG_OVERHEAD
);
tTrace
(
"free mem:
%p"
,
(
char
*
)
cont
-
TRANS_MSG_OVERHEAD
);
tTrace
(
"free mem:%p"
,
(
char
*
)
cont
-
TRANS_MSG_OVERHEAD
);
}
void
*
rpcReallocCont
(
void
*
ptr
,
int32_t
contLen
)
{
...
...
source/libs/transport/src/transCli.c
浏览文件 @
dca93b18
...
...
@@ -184,22 +184,22 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head)
\
do {
\
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) {
\
uint64_t ahandle = head->ahandle;
\
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
\
transClearBuffer(&conn->readBuf);
\
transFreeMsg(transContFromHead((char*)head));
\
tDebug("%s conn %p receive release request, ref:
%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \
if (T_REF_VAL_GET(conn) > 1) {
\
transUnrefCliHandle(conn);
\
}
\
destroyCmsg(pMsg);
\
cliReleaseUnfinishedMsg(conn);
\
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
\
return;
\
}
\
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tDebug("%s conn %p receive release request, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \
if (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
return; \
} \
} while (0)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
...
...
@@ -353,7 +353,7 @@ void cliHandleResp(SCliConn* conn) {
}
STraceId
*
trace
=
&
transMsg
.
info
.
traceId
;
tGTrace
(
"%s conn %p %s received from %s:%d, local info:
%s:%d, msg size: %d, code: %d
"
,
CONN_GET_INST_LABEL
(
conn
),
tGTrace
(
"%s conn %p %s received from %s:%d, local info:
%s:%d, msg size:%d, code:0x%x
"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
conn
->
addr
.
sin_addr
),
ntohs
(
conn
->
addr
.
sin_port
),
taosInetNtoa
(
conn
->
localAddr
.
sin_addr
),
ntohs
(
conn
->
localAddr
.
sin_port
),
transMsg
.
contLen
,
transMsg
.
code
);
...
...
@@ -1294,7 +1294,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
cliMsg
->
refId
=
(
int64_t
)
shandle
;
SCliThrd
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
i
];
tDebug
(
"%s update epset at thread:%08"
PRId64
""
,
pTransInst
->
label
,
thrd
->
pid
);
tDebug
(
"%s update epset at thread:%08"
PRId64
,
pTransInst
->
label
,
thrd
->
pid
);
transAsyncSend
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
));
}
...
...
source/libs/transport/src/transComm.c
浏览文件 @
dca93b18
...
...
@@ -136,7 +136,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
}
else
{
p
->
cap
=
p
->
total
;
p
->
buf
=
taosMemoryRealloc
(
p
->
buf
,
p
->
cap
);
tTrace
(
"internal malloc mem:
%p, size:
%d"
,
p
->
buf
,
p
->
cap
);
tTrace
(
"internal malloc mem:
%p, size:
%d"
,
p
->
buf
,
p
->
cap
);
uvBuf
->
base
=
p
->
buf
+
p
->
len
;
uvBuf
->
len
=
p
->
cap
-
p
->
len
;
...
...
@@ -221,7 +221,7 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
taosThreadMutexUnlock
(
&
item
->
mtx
);
int64_t
el
=
taosGetTimestampUs
()
-
st
;
if
(
el
>
50
)
{
// tInfo("lock and unlock cost:
%d", (int)el);
// tInfo("lock and unlock cost:%d", (int)el);
}
return
uv_async_send
(
async
);
}
...
...
@@ -446,7 +446,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
}
}
tTrace
(
"timer %p put task into delay queue, timeoutMs:
%"
PRIu64
""
,
queue
->
timer
,
timeoutMs
);
tTrace
(
"timer %p put task into delay queue, timeoutMs:
%"
PRIu64
,
queue
->
timer
,
timeoutMs
);
heapInsert
(
queue
->
heap
,
&
task
->
node
);
uv_timer_start
(
queue
->
timer
,
transDQTimeout
,
timeoutMs
,
0
);
return
0
;
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
dca93b18
...
...
@@ -245,11 +245,11 @@ static void uvHandleReq(SSvrConn* pConn) {
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
transRefSrvHandle
(
pConn
);
tGTrace
(
"%s conn %p %s received from %s:%d, local info:
%s:%d, msg size:
%d"
,
transLabel
(
pTransInst
),
pConn
,
tGTrace
(
"%s conn %p %s received from %s:%d, local info:
%s:%d, msg size:
%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
);
}
else
{
tGTrace
(
"%s conn %p %s received from %s:%d, local info:
%s:%d, msg size: %d, resp:%d, code:
%d"
,
tGTrace
(
"%s conn %p %s received from %s:%d, local info:
%s:%d, msg size:%d, resp:%d, code:
%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
,
pHead
->
noResp
,
transMsg
.
code
);
...
...
@@ -265,7 +265,7 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg
.
info
.
refId
=
pConn
->
refId
;
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
tGTrace
(
"%s handle %p conn:
%p translated to app, refId: %"
PRIu64
""
,
transLabel
(
pTransInst
),
transMsg
.
info
.
handle
,
tGTrace
(
"%s handle %p conn:
%p translated to app, refId:%"
PRIu64
,
transLabel
(
pTransInst
),
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
assert
(
transMsg
.
info
.
handle
!=
NULL
);
...
...
@@ -292,7 +292,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
STrans
*
pTransInst
=
conn
->
pTransInst
;
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
tTrace
(
"%s conn %p total read:
%d, current read:
%d"
,
transLabel
(
pTransInst
),
conn
,
pBuf
->
len
,
(
int
)
nread
);
tTrace
(
"%s conn %p total read:
%d, current read:
%d"
,
transLabel
(
pTransInst
),
conn
,
pBuf
->
len
,
(
int
)
nread
);
if
(
transReadComplete
(
pBuf
))
{
tTrace
(
"%s conn %p alread read complete packet"
,
transLabel
(
pTransInst
),
conn
);
uvHandleReq
(
conn
);
...
...
@@ -305,7 +305,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return
;
}
tError
(
"%s conn %p read error:
%s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
nread
));
tError
(
"%s conn %p read error:%s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
nread
));
if
(
nread
<
0
)
{
conn
->
broken
=
true
;
if
(
conn
->
status
==
ConnAcquire
)
{
...
...
@@ -416,7 +416,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
STrans
*
pTransInst
=
pConn
->
pTransInst
;
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
tGTrace
(
"%s conn %p %s is sent to %s:%d, local info:
%s:%d, msglen:%d"
,
transLabel
(
pTransInst
),
pConn
,
tGTrace
(
"%s conn %p %s is sent to %s:%d, local info:%s:%d, msglen:%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
len
);
pHead
->
msgLen
=
htonl
(
len
);
...
...
@@ -540,7 +540,7 @@ static void uvAcceptAsyncCb(uv_async_t* async) {
static
void
uvShutDownCb
(
uv_shutdown_t
*
req
,
int
status
)
{
if
(
status
!=
0
)
{
tDebug
(
"conn failed to shut down:
%s"
,
uv_err_name
(
status
));
tDebug
(
"conn failed to shut down:%s"
,
uv_err_name
(
status
));
}
uv_close
((
uv_handle_t
*
)
req
->
handle
,
uvDestroyConn
);
taosMemoryFree
(
req
);
...
...
@@ -601,7 +601,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tError
(
"read error %s"
,
uv_err_name
(
nread
));
}
// TODO(log other failure reason)
t
Error
(
"failed to create connect:
%p"
,
q
);
t
Warn
(
"failed to create connect:
%p"
,
q
);
taosMemoryFree
(
buf
->
base
);
uv_close
((
uv_handle_t
*
)
q
,
NULL
);
// taosMemoryFree(q);
...
...
@@ -644,7 +644,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
if
(
uv_accept
(
q
,
(
uv_stream_t
*
)(
pConn
->
pTcp
))
==
0
)
{
uv_os_fd_t
fd
;
uv_fileno
((
const
uv_handle_t
*
)
pConn
->
pTcp
,
&
fd
);
tTrace
(
"conn %p created, fd:
%d"
,
pConn
,
fd
);
tTrace
(
"conn %p created, fd:%d"
,
pConn
,
fd
);
int
addrlen
=
sizeof
(
pConn
->
addr
);
if
(
0
!=
uv_tcp_getpeername
(
pConn
->
pTcp
,
(
struct
sockaddr
*
)
&
pConn
->
addr
,
&
addrlen
))
{
...
...
@@ -712,7 +712,7 @@ static bool addHandleToAcceptloop(void* arg) {
int
err
=
0
;
if
((
err
=
uv_tcp_init
(
srv
->
loop
,
&
srv
->
server
))
!=
0
)
{
tError
(
"failed to init accept server:
%s"
,
uv_err_name
(
err
));
tError
(
"failed to init accept server:%s"
,
uv_err_name
(
err
));
return
false
;
}
...
...
@@ -724,11 +724,11 @@ static bool addHandleToAcceptloop(void* arg) {
struct
sockaddr_in
bind_addr
;
uv_ip4_addr
(
"0.0.0.0"
,
srv
->
port
,
&
bind_addr
);
if
((
err
=
uv_tcp_bind
(
&
srv
->
server
,
(
const
struct
sockaddr
*
)
&
bind_addr
,
0
))
!=
0
)
{
tError
(
"failed to bind:
%s"
,
uv_err_name
(
err
));
tError
(
"failed to bind:%s"
,
uv_err_name
(
err
));
return
false
;
}
if
((
err
=
uv_listen
((
uv_stream_t
*
)
&
srv
->
server
,
512
,
uvOnAcceptCb
))
!=
0
)
{
tError
(
"failed to listen:
%s"
,
uv_err_name
(
err
));
tError
(
"failed to listen:%s"
,
uv_err_name
(
err
));
terrno
=
TSDB_CODE_RPC_PORT_EADDRINUSE
;
return
false
;
}
...
...
@@ -765,7 +765,7 @@ static SSvrConn* createConn(void* hThrd) {
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
pConn
->
refId
=
exh
->
refId
;
transRefSrvHandle
(
pConn
);
tTrace
(
"%s handle %p, conn %p created, refId:
%"
PRId64
""
,
transLabel
(
pTransInst
),
exh
,
pConn
,
pConn
->
refId
);
tTrace
(
"%s handle %p, conn %p created, refId:
%"
PRId64
,
transLabel
(
pTransInst
),
exh
,
pConn
,
pConn
->
refId
);
return
pConn
;
}
...
...
@@ -902,7 +902,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
}
if
(
false
==
taosValidIpAndPort
(
srv
->
ip
,
srv
->
port
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tError
(
"invalid ip/port, %d:%d, reason:
%s"
,
srv
->
ip
,
srv
->
port
,
terrstr
());
tError
(
"invalid ip/port, %d:%d, reason:%s"
,
srv
->
ip
,
srv
->
port
,
terrstr
());
goto
End
;
}
if
(
false
==
addHandleToAcceptloop
(
srv
))
{
...
...
@@ -1023,7 +1023,7 @@ void transRefSrvHandle(void* handle) {
return
;
}
int
ref
=
T_REF_INC
((
SSvrConn
*
)
handle
);
tDebug
(
"conn %p ref count:
%d"
,
handle
,
ref
);
tDebug
(
"conn %p ref count:%d"
,
handle
,
ref
);
}
void
transUnrefSrvHandle
(
void
*
handle
)
{
...
...
@@ -1031,7 +1031,7 @@ void transUnrefSrvHandle(void* handle) {
return
;
}
int
ref
=
T_REF_DEC
((
SSvrConn
*
)
handle
);
tDebug
(
"conn %p ref count:
%d"
,
handle
,
ref
);
tDebug
(
"conn %p ref count:%d"
,
handle
,
ref
);
if
(
ref
==
0
)
{
destroyConn
((
SSvrConn
*
)
handle
,
true
);
}
...
...
source/libs/transport/test/pushServer.c
浏览文件 @
dca93b18
...
...
@@ -153,7 +153,7 @@ int main(int argc, char *argv[]) {
dDebugFlag
=
rpcDebugFlag
;
uDebugFlag
=
rpcDebugFlag
;
}
else
{
printf
(
"
\n
usage:
%s
[options]
\n
"
,
argv
[
0
]);
printf
(
"
\n
usage:
%
[options]
\n
"
,
argv
[
0
]);
printf
(
" [-p port]: server port number, default is:%d
\n
"
,
rpcInit
.
localPort
);
printf
(
" [-t threads]: number of rpc threads, default is:%d
\n
"
,
rpcInit
.
numOfThreads
);
printf
(
" [-s sessions]: number of sessions, default is:%d
\n
"
,
rpcInit
.
sessions
);
...
...
source/libs/wal/src/walRead.c
浏览文件 @
dca93b18
...
...
@@ -94,7 +94,7 @@ static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64
ret
=
taosLSeekFile
(
pIdxTFile
,
offset
,
SEEK_SET
);
if
(
ret
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"failed to seek idx file, ver
%ld, pos: %ld
, since %s"
,
ver
,
offset
,
terrstr
());
wError
(
"failed to seek idx file, ver
:%"
PRId64
", pos:%"
PRId64
"
, since %s"
,
ver
,
offset
,
terrstr
());
return
-
1
;
}
SWalIdxEntry
entry
=
{
0
};
...
...
@@ -104,7 +104,7 @@ static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64
wError
(
"failed to read idx file, since %s"
,
terrstr
());
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
wError
(
"read idx file incompletely, read bytes %
ld, bytes should be %lu"
,
ret
,
sizeof
(
SWalIdxEntry
));
wError
(
"read idx file incompletely, read bytes %
"
PRId64
", bytes should be %"
PRIu64
,
ret
,
sizeof
(
SWalIdxEntry
));
}
return
-
1
;
}
...
...
@@ -113,7 +113,7 @@ static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64
ret
=
taosLSeekFile
(
pLogTFile
,
entry
.
offset
,
SEEK_SET
);
if
(
ret
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"failed to seek log file, ver
%ld, pos: %ld
, since %s"
,
ver
,
entry
.
offset
,
terrstr
());
wError
(
"failed to seek log file, ver
:%"
PRId64
", pos:%"
PRId64
"
, since %s"
,
ver
,
entry
.
offset
,
terrstr
());
return
-
1
;
}
return
ret
;
...
...
@@ -153,7 +153,8 @@ static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
return
0
;
}
if
(
ver
>
pWal
->
vers
.
lastVer
||
ver
<
pWal
->
vers
.
firstVer
)
{
wError
(
"invalid version: % "
PRId64
", first ver %ld, last ver %ld"
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
wError
(
"invalid version:%"
PRId64
", first ver:%"
PRId64
", last ver:%"
PRId64
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
...
...
@@ -292,7 +293,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
code
=
walValidHeadCksum
(
pHead
);
if
(
code
!=
0
)
{
wError
(
"unexpected wal log version:
%
"
PRId64
", since head checksum not passed"
,
ver
);
wError
(
"unexpected wal log version:
%
"
PRId64
", since head checksum not passed"
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
...
...
@@ -382,13 +383,13 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
// TODO: check wal life
if
(
pRead
->
curVersion
!=
ver
)
{
if
(
walReadSeekVer
(
pRead
,
ver
)
<
0
)
{
wError
(
"unexpected wal log version:
%
"
PRId64
", since %s"
,
ver
,
terrstr
());
wError
(
"unexpected wal log version:
%
"
PRId64
", since %s"
,
ver
,
terrstr
());
return
-
1
;
}
}
if
(
ver
>
pRead
->
pWal
->
vers
.
lastVer
||
ver
<
pRead
->
pWal
->
vers
.
firstVer
)
{
wError
(
"invalid version:
% "
PRId64
", first ver %ld, last ver %ld"
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
wError
(
"invalid version:
%"
PRId64
", first ver:%"
PRId64
", last ver:%"
PRId64
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
lastVer
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
...
...
@@ -409,7 +410,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
code
=
walValidHeadCksum
(
pRead
->
pHead
);
if
(
code
!=
0
)
{
wError
(
"unexpected wal log version:
%
"
PRId64
", since head checksum not passed"
,
ver
);
wError
(
"unexpected wal log version:
%
"
PRId64
", since head checksum not passed"
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
...
...
@@ -436,8 +437,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
}
if
(
pRead
->
pHead
->
head
.
version
!=
ver
)
{
wError
(
"unexpected wal log version: %"
PRId64
", read request version:%"
PRId64
""
,
pRead
->
pHead
->
head
.
version
,
ver
);
wError
(
"unexpected wal log version:%"
PRId64
", read request version:%"
PRId64
""
,
pRead
->
pHead
->
head
.
version
,
ver
);
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
...
...
@@ -445,7 +445,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
code
=
walValidBodyCksum
(
pRead
->
pHead
);
if
(
code
!=
0
)
{
wError
(
"unexpected wal log version:
%
"
PRId64
", since body checksum not passed"
,
ver
);
wError
(
"unexpected wal log version:
%
"
PRId64
", since body checksum not passed"
,
ver
);
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
dca93b18
...
...
@@ -318,7 +318,7 @@ int walRoll(SWal *pWal) {
static
int
walWriteIndex
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
offset
)
{
SWalIdxEntry
entry
=
{.
ver
=
ver
,
.
offset
=
offset
};
int64_t
idxOffset
=
taosLSeekFile
(
pWal
->
pWriteIdxTFile
,
0
,
SEEK_END
);
wDebug
(
"write index
: ver: %ld, offset: %ld, at %ld"
,
ver
,
offset
,
idxOffset
);
wDebug
(
"write index
, ver:%"
PRId64
", offset:%"
PRId64
", at %"
PRId64
,
ver
,
offset
,
idxOffset
);
int64_t
size
=
taosWriteFile
(
pWal
->
pWriteIdxTFile
,
&
entry
,
sizeof
(
SWalIdxEntry
));
if
(
size
!=
sizeof
(
SWalIdxEntry
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录