Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
35e8ad28
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
35e8ad28
编写于
7月 15, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: add time unsynced check
上级
d7a463da
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
89 addition
and
55 deletion
+89
-55
include/common/tmsg.h
include/common/tmsg.h
+2
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-1
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+3
-1
source/client/src/clientHb.c
source/client/src/clientHb.c
+41
-41
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+12
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-0
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+3
-1
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+9
-0
source/libs/index/src/indexCache.c
source/libs/index/src/indexCache.c
+3
-2
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+6
-6
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+2
-2
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
35e8ad28
...
...
@@ -525,6 +525,7 @@ typedef struct {
int8_t
superUser
;
int8_t
connType
;
SEpSet
epSet
;
int32_t
svrTimestamp
;
char
sVer
[
TSDB_VERSION_LEN
];
char
sDetailVer
[
128
];
}
SConnectRsp
;
...
...
@@ -2224,6 +2225,7 @@ typedef struct {
typedef
struct
{
int64_t
reqId
;
int64_t
rspId
;
int32_t
svrTimestamp
;
SArray
*
rsps
;
// SArray<SClientHbRsp>
}
SClientHbBatchRsp
;
...
...
include/util/taoserror.h
浏览文件 @
35e8ad28
...
...
@@ -73,6 +73,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032)
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0033)
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0034)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
...
...
source/client/inc/clientInt.h
浏览文件 @
35e8ad28
...
...
@@ -285,7 +285,7 @@ static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
extern
SAppInfo
appInfo
;
extern
int32_t
clientReqRefPool
;
extern
int32_t
clientConnRefPool
;
extern
void
*
tscQhandle
;
extern
int32_t
timestampDeltaLimit
;
__async_send_cb_fn_t
getMsgRspHandle
(
int32_t
msgType
);
...
...
source/client/src/clientEnv.c
浏览文件 @
35e8ad28
...
...
@@ -35,6 +35,8 @@ SAppInfo appInfo;
int32_t
clientReqRefPool
=
-
1
;
int32_t
clientConnRefPool
=
-
1
;
int32_t
timestampDeltaLimit
=
900
;
// s
static
TdThreadOnce
tscinit
=
PTHREAD_ONCE_INIT
;
volatile
int32_t
tscInitRes
=
0
;
...
...
@@ -181,7 +183,7 @@ void destroyTscObj(void *pObj) {
destroyAllRequests
(
pTscObj
->
pRequests
);
taosHashCleanup
(
pTscObj
->
pRequests
);
schedulerStopQueryHb
(
pTscObj
->
pAppInfo
->
pTransporter
);
tscDebug
(
"connObj 0x%"
PRIx64
" p:%p destroyed, remain inst totalConn:%"
PRId64
,
pTscObj
->
id
,
pTscObj
,
pTscObj
->
pAppInfo
->
numOfConns
);
...
...
source/client/src/clientHb.c
浏览文件 @
35e8ad28
...
...
@@ -70,7 +70,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if
(
NULL
==
vgInfo
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
vgInfo
->
vgVersion
=
rsp
->
vgVersion
;
vgInfo
->
hashMethod
=
rsp
->
hashMethod
;
vgInfo
->
vgHash
=
taosHashInit
(
rsp
->
vgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
...
...
@@ -156,18 +156,18 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
STscObj
*
pTscObj
=
(
STscObj
*
)
acquireTscObj
(
pRsp
->
connKey
.
tscRid
);
if
(
NULL
==
pTscObj
)
{
tscDebug
(
"tscObj rid %"
PRIx64
" not exist"
,
pRsp
->
connKey
.
tscRid
);
}
else
{
}
else
{
if
(
pRsp
->
query
->
totalDnodes
>
1
&&
!
isEpsetEqual
(
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
pRsp
->
query
->
epSet
))
{
SEpSet
*
pOrig
=
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
;
SEp
*
pOrigEp
=
&
pOrig
->
eps
[
pOrig
->
inUse
];
SEp
*
pNewEp
=
&
pRsp
->
query
->
epSet
.
eps
[
pRsp
->
query
->
epSet
.
inUse
];
tscDebug
(
"mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb"
,
pOrig
->
inUse
,
pOrig
->
numOfEps
,
pOrigEp
->
fqdn
,
pOrigEp
->
port
,
pRsp
->
query
->
epSet
.
inUse
,
pRsp
->
query
->
epSet
.
numOfEps
,
pNewEp
->
fqdn
,
pNewEp
->
port
);
SEpSet
*
pOrig
=
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
;
SEp
*
pOrigEp
=
&
pOrig
->
eps
[
pOrig
->
inUse
];
SEp
*
pNewEp
=
&
pRsp
->
query
->
epSet
.
eps
[
pRsp
->
query
->
epSet
.
inUse
];
tscDebug
(
"mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb"
,
pOrig
->
inUse
,
pOrig
->
numOfEps
,
pOrigEp
->
fqdn
,
pOrigEp
->
port
,
pRsp
->
query
->
epSet
.
inUse
,
pRsp
->
query
->
epSet
.
numOfEps
,
pNewEp
->
fqdn
,
pNewEp
->
port
);
updateEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
,
&
pRsp
->
query
->
epSet
);
}
pTscObj
->
pAppInfo
->
totalDnodes
=
pRsp
->
query
->
totalDnodes
;
pTscObj
->
pAppInfo
->
onlineDnodes
=
pRsp
->
query
->
onlineDnodes
;
pTscObj
->
connId
=
pRsp
->
query
->
connId
;
...
...
@@ -263,13 +263,20 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
}
static
int32_t
hbAsyncCallBack
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
)
{
static
int32_t
emptyRspNum
=
0
;
static
int32_t
emptyRspNum
=
0
;
char
*
key
=
(
char
*
)
param
;
SClientHbBatchRsp
pRsp
=
{
0
};
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tDeserializeSClientHbBatchRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
pRsp
);
}
int32_t
now
=
taosGetTimestampSec
();
int32_t
delta
=
abs
(
now
-
pRsp
.
svrTimestamp
);
if
(
delta
>
timestampDeltaLimit
)
{
code
=
TSDB_CODE_TIME_UNSYNCED
;
tscError
(
"time diff: %ds is too big"
,
delta
);
}
int32_t
rspNum
=
taosArrayGetSize
(
pRsp
.
rsps
);
taosThreadMutexLock
(
&
appInfo
.
mutex
);
...
...
@@ -373,7 +380,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
releaseTscObj
(
connKey
->
tscRid
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
hbBasic
->
connId
=
pTscObj
->
connId
;
int32_t
numOfQueries
=
pTscObj
->
pRequests
?
taosHashGetSize
(
pTscObj
->
pRequests
)
:
0
;
...
...
@@ -392,7 +399,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
int32_t
code
=
hbBuildQueryDesc
(
hbBasic
,
pTscObj
);
if
(
code
)
{
releaseTscObj
(
connKey
->
tscRid
);
...
...
@@ -436,13 +442,12 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
if
(
NULL
==
req
->
info
)
{
req
->
info
=
taosHashInit
(
64
,
hbKeyHashFunc
,
1
,
HASH_ENTRY_LOCK
);
}
taosHashPut
(
req
->
info
,
&
kv
.
key
,
sizeof
(
kv
.
key
),
&
kv
,
sizeof
(
kv
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
hbGetExpiredDBInfo
(
SClientHbKey
*
connKey
,
struct
SCatalog
*
pCatalog
,
SClientHbReq
*
req
)
{
SDbVgVersion
*
dbs
=
NULL
;
uint32_t
dbNum
=
0
;
...
...
@@ -483,8 +488,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
int32_t
hbGetExpiredStbInfo
(
SClientHbKey
*
connKey
,
struct
SCatalog
*
pCatalog
,
SClientHbReq
*
req
)
{
SSTableVersion
*
stbs
=
NULL
;
uint32_t
stbNum
=
0
;
int32_t
code
=
0
;
uint32_t
stbNum
=
0
;
int32_t
code
=
0
;
code
=
catalogGetExpiredSTables
(
pCatalog
,
&
stbs
,
&
stbNum
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
...
...
@@ -521,20 +526,19 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
}
int32_t
hbGetAppInfo
(
int64_t
clusterId
,
SClientHbReq
*
req
)
{
SAppHbReq
*
pApp
=
taosHashGet
(
clientHbMgr
.
appSummary
,
&
clusterId
,
sizeof
(
clusterId
));
SAppHbReq
*
pApp
=
taosHashGet
(
clientHbMgr
.
appSummary
,
&
clusterId
,
sizeof
(
clusterId
));
if
(
NULL
!=
pApp
)
{
memcpy
(
&
req
->
app
,
pApp
,
sizeof
(
*
pApp
));
}
else
{
memset
(
&
req
->
app
.
summary
,
0
,
sizeof
(
req
->
app
.
summary
));
req
->
app
.
pid
=
taosGetPId
();
req
->
app
.
appId
=
clientHbMgr
.
appId
;
taosGetAppName
(
req
->
app
.
name
,
NULL
);
taosGetAppName
(
req
->
app
.
name
,
NULL
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
hbQueryHbReqHandle
(
SClientHbKey
*
connKey
,
void
*
param
,
SClientHbReq
*
req
)
{
int64_t
*
clusterId
=
(
int64_t
*
)
param
;
struct
SCatalog
*
pCatalog
=
NULL
;
...
...
@@ -602,7 +606,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
continue
;
}
//hbClearClientHbReq(pOneReq);
//
hbClearClientHbReq(pOneReq);
pIter
=
taosHashIterate
(
pAppHbMgr
->
activeInfo
,
pIter
);
}
...
...
@@ -615,11 +619,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return
pBatchReq
;
}
void
hbThreadFuncUnexpectedStopped
(
void
)
{
atomic_store_8
(
&
clientHbMgr
.
threadStop
,
2
);
}
void
hbThreadFuncUnexpectedStopped
(
void
)
{
atomic_store_8
(
&
clientHbMgr
.
threadStop
,
2
);
}
void
hbMergeSummary
(
SAppClusterSummary
*
dst
,
SAppClusterSummary
*
src
)
{
void
hbMergeSummary
(
SAppClusterSummary
*
dst
,
SAppClusterSummary
*
src
)
{
dst
->
numOfInsertsReq
+=
src
->
numOfInsertsReq
;
dst
->
numOfInsertRows
+=
src
->
numOfInsertRows
;
dst
->
insertElapsedTime
+=
src
->
insertElapsedTime
;
...
...
@@ -633,7 +635,7 @@ void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) {
int32_t
hbGatherAppInfo
(
void
)
{
SAppHbReq
req
=
{
0
};
int
sz
=
taosArrayGetSize
(
clientHbMgr
.
appHbMgrs
);
int
sz
=
taosArrayGetSize
(
clientHbMgr
.
appHbMgrs
);
if
(
sz
>
0
)
{
req
.
pid
=
taosGetPId
();
req
.
appId
=
clientHbMgr
.
appId
;
...
...
@@ -641,11 +643,11 @@ int32_t hbGatherAppInfo(void) {
}
taosHashClear
(
clientHbMgr
.
appSummary
);
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
SAppHbMgr
*
pAppHbMgr
=
taosArrayGetP
(
clientHbMgr
.
appHbMgrs
,
i
);
uint64_t
clusterId
=
pAppHbMgr
->
pAppInstInfo
->
clusterId
;
SAppHbReq
*
pApp
=
taosHashGet
(
clientHbMgr
.
appSummary
,
&
clusterId
,
sizeof
(
clusterId
));
uint64_t
clusterId
=
pAppHbMgr
->
pAppInstInfo
->
clusterId
;
SAppHbReq
*
pApp
=
taosHashGet
(
clientHbMgr
.
appSummary
,
&
clusterId
,
sizeof
(
clusterId
));
if
(
NULL
==
pApp
)
{
memcpy
(
&
req
.
summary
,
&
pAppHbMgr
->
pAppInstInfo
->
summary
,
sizeof
(
req
.
summary
));
req
.
startTime
=
pAppHbMgr
->
startTime
;
...
...
@@ -654,7 +656,7 @@ int32_t hbGatherAppInfo(void) {
if
(
pAppHbMgr
->
startTime
<
pApp
->
startTime
)
{
pApp
->
startTime
=
pAppHbMgr
->
startTime
;
}
hbMergeSummary
(
&
pApp
->
summary
,
&
pAppHbMgr
->
pAppInstInfo
->
summary
);
}
}
...
...
@@ -662,7 +664,6 @@ int32_t hbGatherAppInfo(void) {
return
TSDB_CODE_SUCCESS
;
}
static
void
*
hbThreadFunc
(
void
*
param
)
{
setThreadName
(
"hb"
);
#ifdef WINDOWS
...
...
@@ -681,7 +682,7 @@ static void *hbThreadFunc(void *param) {
if
(
sz
>
0
)
{
hbGatherAppInfo
();
}
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SAppHbMgr
*
pAppHbMgr
=
taosArrayGetP
(
clientHbMgr
.
appHbMgrs
,
i
);
...
...
@@ -698,7 +699,7 @@ static void *hbThreadFunc(void *param) {
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tFreeClientHbBatchReq
(
pReq
);
//hbClearReqInfo(pAppHbMgr);
//
hbClearReqInfo(pAppHbMgr);
break
;
}
...
...
@@ -708,7 +709,7 @@ static void *hbThreadFunc(void *param) {
if
(
pInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tFreeClientHbBatchReq
(
pReq
);
//hbClearReqInfo(pAppHbMgr);
//
hbClearReqInfo(pAppHbMgr);
taosMemoryFree
(
buf
);
break
;
}
...
...
@@ -725,7 +726,7 @@ static void *hbThreadFunc(void *param) {
SEpSet
epSet
=
getEpSet_s
(
&
pAppInstInfo
->
mgmtEp
);
asyncSendMsgToServer
(
pAppInstInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
pInfo
);
tFreeClientHbBatchReq
(
pReq
);
//hbClearReqInfo(pAppHbMgr);
//
hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32
(
&
pAppHbMgr
->
reportCnt
,
1
);
}
...
...
@@ -759,7 +760,7 @@ static void hbStopThread() {
return
;
}
taosThreadJoin
(
clientHbMgr
.
thread
,
NULL
);
taosThreadJoin
(
clientHbMgr
.
thread
,
NULL
);
tscDebug
(
"hb thread stopped"
);
}
...
...
@@ -808,7 +809,7 @@ void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
}
taosHashCleanup
(
pTarget
->
activeInfo
);
pTarget
->
activeInfo
=
NULL
;
taosMemoryFree
(
pTarget
->
key
);
taosMemoryFree
(
pTarget
);
}
...
...
@@ -843,7 +844,7 @@ int hbMgrInit() {
clientHbMgr
.
appId
=
tGenIdPI64
();
tscDebug
(
"app %"
PRIx64
" initialized"
,
clientHbMgr
.
appId
);
clientHbMgr
.
appSummary
=
taosHashInit
(
10
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
clientHbMgr
.
appHbMgrs
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
taosThreadMutexInit
(
&
clientHbMgr
.
lock
,
NULL
);
...
...
@@ -881,7 +882,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clust
SClientHbReq
hbReq
=
{
0
};
hbReq
.
connKey
=
connKey
;
hbReq
.
clusterId
=
clusterId
;
//hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
//
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut
(
pAppHbMgr
->
activeInfo
,
&
connKey
,
sizeof
(
SClientHbKey
),
&
hbReq
,
sizeof
(
SClientHbReq
));
...
...
@@ -920,4 +921,3 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
atomic_sub_fetch_32
(
&
pAppHbMgr
->
connKeyCnt
,
1
);
}
source/client/src/clientImpl.c
浏览文件 @
35e8ad28
...
...
@@ -1478,7 +1478,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
tsem_wait
(
&
pParam
->
sem
);
}
if
(
pResultInfo
->
numOfRows
==
0
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pResultInfo
->
numOfRows
==
0
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
else
{
if
(
setupOneRowPtr
)
{
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
35e8ad28
...
...
@@ -52,6 +52,18 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
SConnectRsp
connectRsp
=
{
0
};
tDeserializeSConnectRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
connectRsp
);
int32_t
now
=
taosGetTimestampSec
();
int32_t
delta
=
abs
(
now
-
connectRsp
.
svrTimestamp
);
if
(
delta
>
timestampDeltaLimit
)
{
code
=
TSDB_CODE_TIME_UNSYNCED
;
tscError
(
"time diff: %"
PRId64
"ms is too big"
,
delta
);
taosMemoryFree
(
pMsg
->
pData
);
setErrno
(
pRequest
,
code
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
return
code
;
}
/*assert(connectRsp.epSet.numOfEps > 0);*/
if
(
connectRsp
.
epSet
.
numOfEps
==
0
)
{
taosMemoryFree
(
pMsg
->
pData
);
...
...
source/common/src/tmsg.c
浏览文件 @
35e8ad28
...
...
@@ -453,6 +453,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pBatchRsp
->
reqId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pBatchRsp
->
rspId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pBatchRsp
->
svrTimestamp
)
<
0
)
return
-
1
;
int32_t
rspNum
=
taosArrayGetSize
(
pBatchRsp
->
rsps
);
if
(
tEncodeI32
(
&
encoder
,
rspNum
)
<
0
)
return
-
1
;
...
...
@@ -474,6 +475,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pBatchRsp
->
reqId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pBatchRsp
->
rspId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pBatchRsp
->
svrTimestamp
)
<
0
)
return
-
1
;
int32_t
rspNum
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
rspNum
)
<
0
)
return
-
1
;
...
...
@@ -3613,6 +3615,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
superUser
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
connType
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
&
encoder
,
&
pRsp
->
epSet
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
svrTimestamp
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
sVer
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
sDetailVer
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
...
...
@@ -3634,6 +3637,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
superUser
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
connType
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
&
decoder
,
&
pRsp
->
epSet
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
svrTimestamp
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
sVer
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
sDetailVer
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
35e8ad28
...
...
@@ -15,10 +15,10 @@
#define _DEFAULT_SOURCE
#include "mndProfile.h"
#include "mndPrivilege.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndQnode.h"
#include "mndShow.h"
#include "mndStb.h"
...
...
@@ -274,6 +274,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
connectRsp
.
connId
=
pConn
->
id
;
connectRsp
.
connType
=
connReq
.
connType
;
connectRsp
.
dnodeNum
=
mndGetDnodeSize
(
pMnode
);
connectRsp
.
svrTimestamp
=
taosGetTimestampSec
();
strcpy
(
connectRsp
.
sVer
,
version
);
snprintf
(
connectRsp
.
sDetailVer
,
sizeof
(
connectRsp
.
sDetailVer
),
"ver:%s
\n
build:%s
\n
gitinfo:%s"
,
version
,
buildinfo
,
...
...
@@ -623,6 +624,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
}
SClientHbBatchRsp
batchRsp
=
{
0
};
batchRsp
.
svrTimestamp
=
taosGetTimestampMs
();
batchRsp
.
rsps
=
taosArrayInit
(
0
,
sizeof
(
SClientHbRsp
));
int32_t
sz
=
taosArrayGetSize
(
batchReq
.
reqs
);
...
...
source/libs/function/src/udfd.c
浏览文件 @
35e8ad28
...
...
@@ -382,6 +382,15 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if
(
msgInfo
->
rpcType
==
UDFD_RPC_MNODE_CONNECT
)
{
SConnectRsp
connectRsp
=
{
0
};
tDeserializeSConnectRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
connectRsp
);
int32_t
now
=
taosGetTimestampSec
();
int32_t
delta
=
abs
(
now
-
connectRsp
.
svrTimestamp
);
if
(
delta
>
900
)
{
msgInfo
->
code
=
TSDB_CODE_TIME_UNSYNCED
;
goto
_return
;
}
if
(
connectRsp
.
epSet
.
numOfEps
==
0
)
{
msgInfo
->
code
=
TSDB_CODE_MND_APP_ERROR
;
goto
_return
;
...
...
source/libs/index/src/indexCache.c
浏览文件 @
35e8ad28
...
...
@@ -516,13 +516,14 @@ static void idxCacheMakeRoomForWrite(IndexCache* cache) {
idxCacheRef
(
cache
);
cache
->
imm
=
cache
->
mem
;
cache
->
mem
=
idxInternalCacheCreate
(
cache
->
type
);
cache
->
mem
->
pCache
=
cache
;
cache
->
occupiedMem
=
0
;
if
(
quit
==
false
)
{
atomic_store_32
(
&
cache
->
merging
,
1
);
}
// sched to merge
// unref cache in bgwork
//
1.
sched to merge
//
2.
unref cache in bgwork
idxCacheSchedToMerge
(
cache
,
quit
);
}
}
...
...
source/libs/transport/src/transCli.c
浏览文件 @
35e8ad28
...
...
@@ -1041,7 +1041,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STraceId
*
trace
=
&
pMsg
->
msg
.
info
.
traceId
;
char
tbuf
[
256
]
=
{
0
};
EPSET_DEBUG_STR
(
&
pCtx
->
epSet
,
tbuf
);
tG
Trace
(
"%s retry on next node, use %s, retryCnt:%d, limit:%d"
,
transLabel
(
pThrd
->
pTransInst
),
tbuf
,
tG
Debug
(
"%s retry on next node, use %s, retryCnt:%d, limit:%d"
,
transLabel
(
pThrd
->
pTransInst
),
tbuf
,
pCtx
->
retryCnt
+
1
,
pCtx
->
retryLimit
);
STaskArg
*
arg
=
taosMemoryMalloc
(
sizeof
(
STaskArg
));
...
...
@@ -1133,11 +1133,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if
(
hasEpSet
)
{
char
tbuf
[
256
]
=
{
0
};
EPSET_DEBUG_STR
(
&
pCtx
->
epSet
,
tbuf
);
tG
Trace
(
"%s conn %p extract epset from msg"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
tG
Debug
(
"%s conn %p extract epset from msg"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
}
if
(
pCtx
->
pSem
!=
NULL
)
{
tG
Trace
(
"%s conn %p(sync) handle resp"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
tG
Debug
(
"%s conn %p(sync) handle resp"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
if
(
pCtx
->
pRsp
==
NULL
)
{
tGTrace
(
"%s conn %p(sync) failed to resp, ignore"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
}
else
{
...
...
@@ -1146,7 +1146,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tsem_post
(
pCtx
->
pSem
);
pCtx
->
pRsp
=
NULL
;
}
else
{
tG
Trace
(
"%s conn %p handle resp"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
tG
Debug
(
"%s conn %p handle resp"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
if
(
retry
==
false
&&
hasEpSet
==
true
)
{
pTransInst
->
cfp
(
pTransInst
->
parent
,
pResp
,
&
pCtx
->
epSet
);
}
else
{
...
...
@@ -1256,7 +1256,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
cliMsg
->
refId
=
(
int64_t
)
shandle
;
STraceId
*
trace
=
&
pReq
->
info
.
traceId
;
tG
Trace
(
"%s send request at thread:%08"
PRId64
", dst:%s:%d, app:%p"
,
transLabel
(
pTransInst
),
pThrd
->
pid
,
tG
Debug
(
"%s send request at thread:%08"
PRId64
", dst:%s:%d, app:%p"
,
transLabel
(
pTransInst
),
pThrd
->
pid
,
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
),
pReq
->
info
.
ahandle
);
ASSERT
(
transAsyncSend
(
pThrd
->
asyncPool
,
&
(
cliMsg
->
q
))
==
0
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
...
...
@@ -1296,7 +1296,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
cliMsg
->
refId
=
(
int64_t
)
shandle
;
STraceId
*
trace
=
&
pReq
->
info
.
traceId
;
tG
Trace
(
"%s send request at thread:%08"
PRId64
", dst:%s:%d, app:%p"
,
transLabel
(
pTransInst
),
pThrd
->
pid
,
tG
Debug
(
"%s send request at thread:%08"
PRId64
", dst:%s:%d, app:%p"
,
transLabel
(
pTransInst
),
pThrd
->
pid
,
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
),
pReq
->
info
.
ahandle
);
transAsyncSend
(
pThrd
->
asyncPool
,
&
(
cliMsg
->
q
));
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
35e8ad28
...
...
@@ -1020,7 +1020,7 @@ void transRefSrvHandle(void* handle) {
return
;
}
int
ref
=
T_REF_INC
((
SSvrConn
*
)
handle
);
t
Debug
(
"conn %p ref count:%d"
,
handle
,
ref
);
t
Trace
(
"conn %p ref count:%d"
,
handle
,
ref
);
}
void
transUnrefSrvHandle
(
void
*
handle
)
{
...
...
@@ -1028,7 +1028,7 @@ void transUnrefSrvHandle(void* handle) {
return
;
}
int
ref
=
T_REF_DEC
((
SSvrConn
*
)
handle
);
t
Debug
(
"conn %p ref count:%d"
,
handle
,
ref
);
t
Trace
(
"conn %p ref count:%d"
,
handle
,
ref
);
if
(
ref
==
0
)
{
destroyConn
((
SSvrConn
*
)
handle
,
true
);
}
...
...
source/util/src/terror.c
浏览文件 @
35e8ad28
...
...
@@ -78,6 +78,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp for
TAOS_DEFINE_ERROR
(
TSDB_CODE_MSG_DECODE_ERROR
,
"Msg decode error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_NO_AVAIL_DISK
,
"No available disk"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_NOT_FOUND
,
"Not found"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TIME_UNSYNCED
,
"Unsynced time"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_NO_MEMORY
,
"Ref out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_FULL
,
"too many Ref Objs"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录