Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d3203263
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d3203263
编写于
6月 17, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
reset query cache
上级
d12614aa
变更
18
显示空白变更内容
内联
并排
Showing
18 changed file
with
227 addition
and
118 deletion
+227
-118
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+2
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+3
-0
source/client/src/clientMain.c
source/client/src/clientMain.c
+3
-0
source/common/src/tglobal.c
source/common/src/tglobal.c
+3
-5
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+21
-13
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+24
-7
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+2
-2
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+110
-45
source/libs/catalog/src/ctgDbg.c
source/libs/catalog/src/ctgDbg.c
+11
-9
source/libs/catalog/src/ctgRemote.c
source/libs/catalog/src/ctgRemote.c
+3
-3
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+1
-1
source/libs/command/src/command.c
source/libs/command/src/command.c
+2
-2
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+3
-1
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+20
-18
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+3
-3
source/libs/scheduler/src/schUtil.c
source/libs/scheduler/src/schUtil.c
+1
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+14
-6
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+1
-2
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
d3203263
...
@@ -292,6 +292,8 @@ int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId,
...
@@ -292,6 +292,8 @@ int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId,
int32_t
ctgdLaunchAsyncCall
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
uint64_t
reqId
,
bool
forceUpdate
);
int32_t
ctgdLaunchAsyncCall
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
uint64_t
reqId
,
bool
forceUpdate
);
int32_t
catalogClearCache
(
void
);
/**
/**
* Destroy catalog and relase all resources
* Destroy catalog and relase all resources
*/
*/
...
...
source/client/src/clientImpl.c
浏览文件 @
d3203263
...
@@ -609,6 +609,9 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
...
@@ -609,6 +609,9 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
pRequest
->
code
=
code
;
pRequest
->
code
=
code
;
tscDebug
(
"0x%"
PRIx64
" enter scheduler exec cb, code:%d - %s, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
requestId
);
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
if
(
code
!=
TSDB_CODE_SUCCESS
&&
NEED_CLIENT_HANDLE_ERROR
(
code
))
{
if
(
code
!=
TSDB_CODE_SUCCESS
&&
NEED_CLIENT_HANDLE_ERROR
(
code
))
{
tscDebug
(
"0x%"
PRIx64
" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"
PRIx64
,
tscDebug
(
"0x%"
PRIx64
" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"
PRIx64
,
...
...
source/client/src/clientMain.c
浏览文件 @
d3203263
...
@@ -837,6 +837,9 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
...
@@ -837,6 +837,9 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
tscDebug
(
"0x%"
PRIx64
" enter scheduler fetch cb, code:%d - %s, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
requestId
);
pResultInfo
->
pData
=
pResult
;
pResultInfo
->
pData
=
pResult
;
pResultInfo
->
numOfRows
=
0
;
pResultInfo
->
numOfRows
=
0
;
...
...
source/common/src/tglobal.c
浏览文件 @
d3203263
...
@@ -60,7 +60,7 @@ int32_t tsNumOfVnodeWriteThreads = 2;
...
@@ -60,7 +60,7 @@ int32_t tsNumOfVnodeWriteThreads = 2;
int32_t
tsNumOfVnodeSyncThreads
=
2
;
int32_t
tsNumOfVnodeSyncThreads
=
2
;
int32_t
tsNumOfVnodeMergeThreads
=
2
;
int32_t
tsNumOfVnodeMergeThreads
=
2
;
int32_t
tsNumOfQnodeQueryThreads
=
2
;
int32_t
tsNumOfQnodeQueryThreads
=
2
;
int32_t
tsNumOfQnodeFetchThreads
=
2
;
int32_t
tsNumOfQnodeFetchThreads
=
1
;
int32_t
tsNumOfSnodeSharedThreads
=
2
;
int32_t
tsNumOfSnodeSharedThreads
=
2
;
int32_t
tsNumOfSnodeUniqueThreads
=
2
;
int32_t
tsNumOfSnodeUniqueThreads
=
2
;
...
@@ -418,8 +418,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
...
@@ -418,8 +418,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads
=
TMAX
(
tsNumOfVnodeQueryThreads
,
1
);
tsNumOfVnodeQueryThreads
=
TMAX
(
tsNumOfVnodeQueryThreads
,
1
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeQueryThreads"
,
tsNumOfVnodeQueryThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeQueryThreads"
,
tsNumOfVnodeQueryThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeFetchThreads
=
tsNumOfCores
/
2
;
tsNumOfVnodeFetchThreads
=
TRANGE
(
tsNumOfVnodeFetchThreads
,
1
,
1
);
tsNumOfVnodeFetchThreads
=
TRANGE
(
tsNumOfVnodeFetchThreads
,
2
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeFetchThreads"
,
tsNumOfVnodeFetchThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeFetchThreads"
,
tsNumOfVnodeFetchThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeWriteThreads
=
tsNumOfCores
;
tsNumOfVnodeWriteThreads
=
tsNumOfCores
;
...
@@ -438,8 +437,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
...
@@ -438,8 +437,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads
=
TMAX
(
tsNumOfQnodeQueryThreads
,
1
);
tsNumOfQnodeQueryThreads
=
TMAX
(
tsNumOfQnodeQueryThreads
,
1
);
if
(
cfgAddInt32
(
pCfg
,
"numOfQnodeQueryThreads"
,
tsNumOfQnodeQueryThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfQnodeQueryThreads"
,
tsNumOfQnodeQueryThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfQnodeFetchThreads
=
tsNumOfCores
/
2
;
tsNumOfQnodeFetchThreads
=
TRANGE
(
tsNumOfQnodeFetchThreads
,
1
,
1
);
tsNumOfQnodeFetchThreads
=
TRANGE
(
tsNumOfQnodeFetchThreads
,
2
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfQnodeFetchThreads"
,
tsNumOfQnodeFetchThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfQnodeFetchThreads"
,
tsNumOfQnodeFetchThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfSnodeSharedThreads
=
tsNumOfCores
/
4
;
tsNumOfSnodeSharedThreads
=
tsNumOfCores
/
4
;
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
d3203263
...
@@ -59,6 +59,7 @@ enum {
...
@@ -59,6 +59,7 @@ enum {
CTG_OP_UPDATE_VG_EPSET
,
CTG_OP_UPDATE_VG_EPSET
,
CTG_OP_UPDATE_TB_INDEX
,
CTG_OP_UPDATE_TB_INDEX
,
CTG_OP_DROP_TB_INDEX
,
CTG_OP_DROP_TB_INDEX
,
CTG_OP_CLEAR_CACHE
,
CTG_OP_MAX
CTG_OP_MAX
};
};
...
@@ -328,6 +329,10 @@ typedef struct SCtgDropTbIndexMsg {
...
@@ -328,6 +329,10 @@ typedef struct SCtgDropTbIndexMsg {
char
tbName
[
TSDB_TABLE_NAME_LEN
];
char
tbName
[
TSDB_TABLE_NAME_LEN
];
}
SCtgDropTbIndexMsg
;
}
SCtgDropTbIndexMsg
;
typedef
struct
SCtgClearCacheMsg
{
SCatalog
*
pCtg
;
}
SCtgClearCacheMsg
;
typedef
struct
SCtgUpdateEpsetMsg
{
typedef
struct
SCtgUpdateEpsetMsg
{
SCatalog
*
pCtg
;
SCatalog
*
pCtg
;
char
dbFName
[
TSDB_DB_FNAME_LEN
];
char
dbFName
[
TSDB_DB_FNAME_LEN
];
...
@@ -502,6 +507,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
...
@@ -502,6 +507,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
int32_t
ctgUpdateUserEnqueue
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
,
bool
syncReq
);
int32_t
ctgUpdateUserEnqueue
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
,
bool
syncReq
);
int32_t
ctgUpdateVgEpsetEnqueue
(
SCatalog
*
pCtg
,
char
*
dbFName
,
int32_t
vgId
,
SEpSet
*
pEpSet
);
int32_t
ctgUpdateVgEpsetEnqueue
(
SCatalog
*
pCtg
,
char
*
dbFName
,
int32_t
vgId
,
SEpSet
*
pEpSet
);
int32_t
ctgUpdateTbIndexEnqueue
(
SCatalog
*
pCtg
,
STableIndex
**
pIndex
,
bool
syncOp
);
int32_t
ctgUpdateTbIndexEnqueue
(
SCatalog
*
pCtg
,
STableIndex
**
pIndex
,
bool
syncOp
);
int32_t
ctgClearCacheEnqueue
(
SCatalog
*
pCtg
,
bool
syncOp
);
int32_t
ctgMetaRentInit
(
SCtgRentMgmt
*
mgmt
,
uint32_t
rentSec
,
int8_t
type
);
int32_t
ctgMetaRentInit
(
SCtgRentMgmt
*
mgmt
,
uint32_t
rentSec
,
int8_t
type
);
int32_t
ctgMetaRentAdd
(
SCtgRentMgmt
*
mgmt
,
void
*
meta
,
int64_t
id
,
int32_t
size
);
int32_t
ctgMetaRentAdd
(
SCtgRentMgmt
*
mgmt
,
void
*
meta
,
int64_t
id
,
int32_t
size
);
int32_t
ctgMetaRentGet
(
SCtgRentMgmt
*
mgmt
,
void
**
res
,
uint32_t
*
num
,
int32_t
size
);
int32_t
ctgMetaRentGet
(
SCtgRentMgmt
*
mgmt
,
void
**
res
,
uint32_t
*
num
,
int32_t
size
);
...
@@ -513,6 +519,8 @@ int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, SName* pTableName, SArray** pRes
...
@@ -513,6 +519,8 @@ int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, SName* pTableName, SArray** pRes
int32_t
ctgDropTbIndexEnqueue
(
SCatalog
*
pCtg
,
SName
*
pName
,
bool
syncOp
);
int32_t
ctgDropTbIndexEnqueue
(
SCatalog
*
pCtg
,
SName
*
pName
,
bool
syncOp
);
int32_t
ctgOpDropTbIndex
(
SCtgCacheOperation
*
operation
);
int32_t
ctgOpDropTbIndex
(
SCtgCacheOperation
*
operation
);
int32_t
ctgOpUpdateTbIndex
(
SCtgCacheOperation
*
operation
);
int32_t
ctgOpUpdateTbIndex
(
SCtgCacheOperation
*
operation
);
int32_t
ctgOpClearCache
(
SCtgCacheOperation
*
operation
);
...
@@ -547,7 +555,7 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
...
@@ -547,7 +555,7 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t
ctgDbVgVersionSearchCompare
(
const
void
*
key1
,
const
void
*
key2
);
int32_t
ctgDbVgVersionSearchCompare
(
const
void
*
key1
,
const
void
*
key2
);
void
ctgFreeSTableMetaOutput
(
STableMetaOutput
*
pOutput
);
void
ctgFreeSTableMetaOutput
(
STableMetaOutput
*
pOutput
);
int32_t
ctgUpdateMsgCtx
(
SCtgMsgCtx
*
pCtx
,
int32_t
reqType
,
void
*
out
,
char
*
target
);
int32_t
ctgUpdateMsgCtx
(
SCtgMsgCtx
*
pCtx
,
int32_t
reqType
,
void
*
out
,
char
*
target
);
char
*
ctgTaskTypeStr
(
CTG_TASK_TYPE
type
);
char
*
ctgTaskTypeStr
(
CTG_TASK_TYPE
type
);
int32_t
ctgUpdateSendTargetInfo
(
SMsgSendInfo
*
pMsgSendInfo
,
int32_t
msgType
,
SCtgTask
*
pTask
);
int32_t
ctgUpdateSendTargetInfo
(
SMsgSendInfo
*
pMsgSendInfo
,
int32_t
msgType
,
SCtgTask
*
pTask
);
int32_t
ctgCloneTableIndex
(
SArray
*
pIndex
,
SArray
**
pRes
);
int32_t
ctgCloneTableIndex
(
SArray
*
pIndex
,
SArray
**
pRes
);
void
ctgFreeSTableIndex
(
void
*
info
);
void
ctgFreeSTableIndex
(
void
*
info
);
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
d3203263
...
@@ -105,7 +105,7 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
...
@@ -105,7 +105,7 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
code
=
ctgGetDBVgInfoFromMnode
(
pCtg
,
pConn
,
&
input
,
&
DbOut
,
NULL
);
code
=
ctgGetDBVgInfoFromMnode
(
pCtg
,
pConn
,
&
input
,
&
DbOut
,
NULL
);
if
(
code
)
{
if
(
code
)
{
if
(
CTG_DB_NOT_EXIST
(
code
)
&&
(
NULL
!=
dbCache
))
{
if
(
CTG_DB_NOT_EXIST
(
code
)
&&
(
NULL
!=
dbCache
))
{
ctgDebug
(
"db no longer exist, dbFName:%s, dbId:%"
PRIx64
,
input
.
db
,
input
.
dbId
);
ctgDebug
(
"db no longer exist, dbFName:%s, dbId:
0x
%"
PRIx64
,
input
.
db
,
input
.
dbId
);
ctgDropDbCacheEnqueue
(
pCtg
,
input
.
db
,
input
.
dbId
);
ctgDropDbCacheEnqueue
(
pCtg
,
input
.
db
,
input
.
dbId
);
}
}
...
@@ -571,7 +571,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
...
@@ -571,7 +571,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
}
}
if
(
NULL
==
gCtgMgmt
.
pCluster
)
{
if
(
NULL
==
gCtgMgmt
.
pCluster
)
{
qError
(
"catalog cluster cache are not ready, clusterId:%"
PRIx64
,
clusterId
);
qError
(
"catalog cluster cache are not ready, clusterId:
0x
%"
PRIx64
,
clusterId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_NOT_READY
);
CTG_ERR_RET
(
TSDB_CODE_CTG_NOT_READY
);
}
}
...
@@ -583,7 +583,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
...
@@ -583,7 +583,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
if
(
ctg
&&
(
*
ctg
))
{
if
(
ctg
&&
(
*
ctg
))
{
*
catalogHandle
=
*
ctg
;
*
catalogHandle
=
*
ctg
;
qDebug
(
"got catalog handle from cache, clusterId:%"
PRIx64
", CTG:%p"
,
clusterId
,
*
ctg
);
qDebug
(
"got catalog handle from cache, clusterId:
0x
%"
PRIx64
", CTG:%p"
,
clusterId
,
*
ctg
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -612,11 +612,11 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
...
@@ -612,11 +612,11 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
continue
;
continue
;
}
}
qError
(
"taosHashPut CTG to cache failed, clusterId:%"
PRIx64
,
clusterId
);
qError
(
"taosHashPut CTG to cache failed, clusterId:
0x
%"
PRIx64
,
clusterId
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
qDebug
(
"add CTG to cache, clusterId:%"
PRIx64
", CTG:%p"
,
clusterId
,
clusterCtg
);
qDebug
(
"add CTG to cache, clusterId:
0x
%"
PRIx64
", CTG:%p"
,
clusterId
,
clusterCtg
);
break
;
break
;
}
}
...
@@ -640,7 +640,7 @@ void catalogFreeHandle(SCatalog* pCtg) {
...
@@ -640,7 +640,7 @@ void catalogFreeHandle(SCatalog* pCtg) {
}
}
if
(
taosHashRemove
(
gCtgMgmt
.
pCluster
,
&
pCtg
->
clusterId
,
sizeof
(
pCtg
->
clusterId
)))
{
if
(
taosHashRemove
(
gCtgMgmt
.
pCluster
,
&
pCtg
->
clusterId
,
sizeof
(
pCtg
->
clusterId
)))
{
ctgWarn
(
"taosHashRemove from cluster failed, may already be freed, clusterId:%"
PRIx64
,
pCtg
->
clusterId
);
ctgWarn
(
"taosHashRemove from cluster failed, may already be freed, clusterId:
0x
%"
PRIx64
,
pCtg
->
clusterId
);
return
;
return
;
}
}
...
@@ -650,7 +650,7 @@ void catalogFreeHandle(SCatalog* pCtg) {
...
@@ -650,7 +650,7 @@ void catalogFreeHandle(SCatalog* pCtg) {
ctgFreeHandle
(
pCtg
);
ctgFreeHandle
(
pCtg
);
ctgInfo
(
"handle freed, culsterId:%"
PRIx64
,
clusterId
);
ctgInfo
(
"handle freed, culsterId:
0x
%"
PRIx64
,
clusterId
);
}
}
int32_t
catalogGetDBVgVersion
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int32_t
*
version
,
int64_t
*
dbId
,
int32_t
*
tableNum
)
{
int32_t
catalogGetDBVgVersion
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int32_t
*
version
,
int64_t
*
dbId
,
int32_t
*
tableNum
)
{
...
@@ -1247,6 +1247,23 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
...
@@ -1247,6 +1247,23 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
CTG_API_LEAVE
(
ctgUpdateUserEnqueue
(
pCtg
,
pAuth
,
false
));
CTG_API_LEAVE
(
ctgUpdateUserEnqueue
(
pCtg
,
pAuth
,
false
));
}
}
int32_t
catalogClearCache
(
void
)
{
CTG_API_ENTER
();
qInfo
(
"start to clear catalog cache"
);
if
(
NULL
==
gCtgMgmt
.
pCluster
||
atomic_load_8
((
int8_t
*
)
&
gCtgMgmt
.
exit
))
{
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
int32_t
code
=
ctgClearCacheEnqueue
(
NULL
,
true
);
qInfo
(
"clear catalog cache end, code: %s"
,
tstrerror
(
code
));
CTG_API_LEAVE
(
code
);
}
void
catalogDestroy
(
void
)
{
void
catalogDestroy
(
void
)
{
qInfo
(
"start to destroy catalog"
);
qInfo
(
"start to destroy catalog"
);
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
d3203263
...
@@ -622,7 +622,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
...
@@ -622,7 +622,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
SCtgJob
*
pJob
=
pTask
->
pJob
;
SCtgJob
*
pJob
=
pTask
->
pJob
;
int32_t
code
=
0
;
int32_t
code
=
0
;
qDebug
(
"QID:0x%"
PRIx64
" task %d end with r
sp
%s"
,
pJob
->
queryId
,
pTask
->
taskId
,
tstrerror
(
rspCode
));
qDebug
(
"QID:0x%"
PRIx64
" task %d end with r
es
%s"
,
pJob
->
queryId
,
pTask
->
taskId
,
tstrerror
(
rspCode
));
pTask
->
code
=
rspCode
;
pTask
->
code
=
rspCode
;
...
@@ -1276,7 +1276,7 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
...
@@ -1276,7 +1276,7 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
i
);
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
i
);
qDebug
(
"QID:0x%"
PRIx64
" start to launch task %d"
,
pJob
->
queryId
,
pTask
->
taskId
);
qDebug
(
"QID:0x%"
PRIx64
"
ctg
start to launch task %d"
,
pJob
->
queryId
,
pTask
->
taskId
);
CTG_ERR_RET
((
*
gCtgAsyncFps
[
pTask
->
type
].
launchFp
)(
pTask
));
CTG_ERR_RET
((
*
gCtgAsyncFps
[
pTask
->
type
].
launchFp
)(
pTask
));
}
}
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
d3203263
...
@@ -69,6 +69,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
...
@@ -69,6 +69,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
CTG_OP_DROP_TB_INDEX
,
CTG_OP_DROP_TB_INDEX
,
"drop tbIndex"
,
"drop tbIndex"
,
ctgOpDropTbIndex
ctgOpDropTbIndex
},
{
CTG_OP_CLEAR_CACHE
,
"clear cache"
,
ctgOpClearCache
}
}
};
};
...
@@ -81,7 +86,7 @@ int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
...
@@ -81,7 +86,7 @@ int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
if
(
dbCache
->
deleted
)
{
if
(
dbCache
->
deleted
)
{
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
vgCache
.
vgLock
);
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
vgCache
.
vgLock
);
ctgDebug
(
"db is dropping, dbId:%"
PRIx64
,
dbCache
->
dbId
);
ctgDebug
(
"db is dropping, dbId:
0x
%"
PRIx64
,
dbCache
->
dbId
);
*
inCache
=
false
;
*
inCache
=
false
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -92,7 +97,7 @@ int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
...
@@ -92,7 +97,7 @@ int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
vgCache
.
vgLock
);
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
vgCache
.
vgLock
);
*
inCache
=
false
;
*
inCache
=
false
;
ctgDebug
(
"db vgInfo is empty, dbId:%"
PRIx64
,
dbCache
->
dbId
);
ctgDebug
(
"db vgInfo is empty, dbId:
0x
%"
PRIx64
,
dbCache
->
dbId
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -105,7 +110,7 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
...
@@ -105,7 +110,7 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
vgCache
.
vgLock
);
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
vgCache
.
vgLock
);
if
(
dbCache
->
deleted
)
{
if
(
dbCache
->
deleted
)
{
ctgDebug
(
"db is dropping, dbId:%"
PRIx64
,
dbCache
->
dbId
);
ctgDebug
(
"db is dropping, dbId:
0x
%"
PRIx64
,
dbCache
->
dbId
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgCache
.
vgLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgCache
.
vgLock
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
}
...
@@ -280,27 +285,27 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid,
...
@@ -280,27 +285,27 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid,
int32_t
sz
=
0
;
int32_t
sz
=
0
;
char
*
stName
=
taosHashAcquire
(
dbCache
->
stbCache
,
&
suid
,
sizeof
(
suid
));
char
*
stName
=
taosHashAcquire
(
dbCache
->
stbCache
,
&
suid
,
sizeof
(
suid
));
if
(
NULL
==
stName
)
{
if
(
NULL
==
stName
)
{
ctgDebug
(
"stb %"
PRIx64
" not in cache, dbFName:%s"
,
suid
,
dbFName
);
ctgDebug
(
"stb
0x
%"
PRIx64
" not in cache, dbFName:%s"
,
suid
,
dbFName
);
goto
_return
;
goto
_return
;
}
}
pCache
=
taosHashAcquire
(
dbCache
->
tbCache
,
stName
,
strlen
(
stName
));
pCache
=
taosHashAcquire
(
dbCache
->
tbCache
,
stName
,
strlen
(
stName
));
if
(
NULL
==
pCache
)
{
if
(
NULL
==
pCache
)
{
ctgDebug
(
"stb %"
PRIx64
" name %s not in cache, dbFName:%s"
,
suid
,
stName
,
dbFName
);
ctgDebug
(
"stb
0x
%"
PRIx64
" name %s not in cache, dbFName:%s"
,
suid
,
stName
,
dbFName
);
taosHashRelease
(
dbCache
->
stbCache
,
stName
);
taosHashRelease
(
dbCache
->
stbCache
,
stName
);
goto
_return
;
goto
_return
;
}
}
CTG_LOCK
(
CTG_READ
,
&
pCache
->
metaLock
);
CTG_LOCK
(
CTG_READ
,
&
pCache
->
metaLock
);
if
(
NULL
==
pCache
->
pMeta
)
{
if
(
NULL
==
pCache
->
pMeta
)
{
ctgDebug
(
"stb %"
PRIx64
" meta not in cache, dbFName:%s"
,
suid
,
dbFName
);
ctgDebug
(
"stb
0x
%"
PRIx64
" meta not in cache, dbFName:%s"
,
suid
,
dbFName
);
goto
_return
;
goto
_return
;
}
}
*
pDb
=
dbCache
;
*
pDb
=
dbCache
;
*
pTb
=
pCache
;
*
pTb
=
pCache
;
ctgDebug
(
"stb %"
PRIx64
" meta got in cache, dbFName:%s"
,
suid
,
dbFName
);
ctgDebug
(
"stb
0x
%"
PRIx64
" meta got in cache, dbFName:%s"
,
suid
,
dbFName
);
CTG_CACHE_STAT_INC
(
tbMetaHitNum
,
1
);
CTG_CACHE_STAT_INC
(
tbMetaHitNum
,
1
);
...
@@ -434,14 +439,14 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
...
@@ -434,14 +439,14 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
if
(
NULL
==
tbCache
)
{
if
(
NULL
==
tbCache
)
{
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
taosMemoryFreeClear
(
*
pTableMeta
);
taosMemoryFreeClear
(
*
pTableMeta
);
ctgDebug
(
"stb %"
PRIx64
" meta not in cache"
,
ctx
->
tbInfo
.
suid
);
ctgDebug
(
"stb
0x
%"
PRIx64
" meta not in cache"
,
ctx
->
tbInfo
.
suid
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
STableMeta
*
stbMeta
=
tbCache
->
pMeta
;
STableMeta
*
stbMeta
=
tbCache
->
pMeta
;
if
(
stbMeta
->
suid
!=
ctx
->
tbInfo
.
suid
)
{
if
(
stbMeta
->
suid
!=
ctx
->
tbInfo
.
suid
)
{
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
ctgError
(
"stb suid
%"
PRIx64
" in stbCache mis-match, expected suid:
%"
PRIx64
,
stbMeta
->
suid
,
ctx
->
tbInfo
.
suid
);
ctgError
(
"stb suid
0x%"
PRIx64
" in stbCache mis-match, expected suid 0x
%"
PRIx64
,
stbMeta
->
suid
,
ctx
->
tbInfo
.
suid
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
...
@@ -492,7 +497,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
...
@@ -492,7 +497,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
*
sver
=
tbMeta
->
sversion
;
*
sver
=
tbMeta
->
sversion
;
*
tver
=
tbMeta
->
tversion
;
*
tver
=
tbMeta
->
tversion
;
ctgDebug
(
"Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:%"
PRIx64
,
ctgDebug
(
"Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:
0x
%"
PRIx64
,
pTableName
->
tname
,
dbFName
,
*
tbType
,
*
sver
,
*
tver
,
*
suid
);
pTableName
->
tname
,
dbFName
,
*
tbType
,
*
sver
,
*
tver
,
*
suid
);
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
...
@@ -507,14 +512,14 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
...
@@ -507,14 +512,14 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
ctgAcquireStbMetaFromCache
(
pCtg
,
dbFName
,
*
suid
,
&
dbCache
,
&
tbCache
);
ctgAcquireStbMetaFromCache
(
pCtg
,
dbFName
,
*
suid
,
&
dbCache
,
&
tbCache
);
if
(
NULL
==
tbCache
)
{
if
(
NULL
==
tbCache
)
{
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
ctgDebug
(
"stb %"
PRIx64
" meta not in cache"
,
*
suid
);
ctgDebug
(
"stb
0x
%"
PRIx64
" meta not in cache"
,
*
suid
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
STableMeta
*
stbMeta
=
tbCache
->
pMeta
;
STableMeta
*
stbMeta
=
tbCache
->
pMeta
;
if
(
stbMeta
->
suid
!=
*
suid
)
{
if
(
stbMeta
->
suid
!=
*
suid
)
{
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
ctgError
(
"stb suid
%"
PRIx64
" in stbCache mis-match, expected suid:
%"
PRIx64
,
stbMeta
->
suid
,
*
suid
);
ctgError
(
"stb suid
0x%"
PRIx64
" in stbCache mis-match, expected suid:0x
%"
PRIx64
,
stbMeta
->
suid
,
*
suid
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
...
@@ -990,6 +995,33 @@ _return:
...
@@ -990,6 +995,33 @@ _return:
}
}
int32_t
ctgClearCacheEnqueue
(
SCatalog
*
pCtg
,
bool
syncOp
)
{
int32_t
code
=
0
;
SCtgCacheOperation
*
op
=
taosMemoryCalloc
(
1
,
sizeof
(
SCtgCacheOperation
));
op
->
opId
=
CTG_OP_CLEAR_CACHE
;
op
->
syncOp
=
syncOp
;
SCtgClearCacheMsg
*
msg
=
taosMemoryMalloc
(
sizeof
(
SCtgClearCacheMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgClearCacheMsg
));
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
msg
->
pCtg
=
pCtg
;
op
->
data
=
msg
;
CTG_ERR_JRET
(
ctgEnqueue
(
pCtg
,
op
));
return
TSDB_CODE_SUCCESS
;
_return:
taosMemoryFreeClear
(
msg
);
CTG_RET
(
code
);
}
int32_t
ctgMetaRentInit
(
SCtgRentMgmt
*
mgmt
,
uint32_t
rentSec
,
int8_t
type
)
{
int32_t
ctgMetaRentInit
(
SCtgRentMgmt
*
mgmt
,
uint32_t
rentSec
,
int8_t
type
)
{
mgmt
->
slotRIdx
=
0
;
mgmt
->
slotRIdx
=
0
;
mgmt
->
slotNum
=
rentSec
/
CTG_RENT_SLOT_SECOND
;
mgmt
->
slotNum
=
rentSec
/
CTG_RENT_SLOT_SECOND
;
...
@@ -1019,19 +1051,19 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size)
...
@@ -1019,19 +1051,19 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size)
if
(
NULL
==
slot
->
meta
)
{
if
(
NULL
==
slot
->
meta
)
{
slot
->
meta
=
taosArrayInit
(
CTG_DEFAULT_RENT_SLOT_SIZE
,
size
);
slot
->
meta
=
taosArrayInit
(
CTG_DEFAULT_RENT_SLOT_SIZE
,
size
);
if
(
NULL
==
slot
->
meta
)
{
if
(
NULL
==
slot
->
meta
)
{
qError
(
"taosArrayInit %d failed, id:%"
PRIx64
", slot idx:%d, type:%d"
,
CTG_DEFAULT_RENT_SLOT_SIZE
,
id
,
widx
,
mgmt
->
type
);
qError
(
"taosArrayInit %d failed, id:
0x
%"
PRIx64
", slot idx:%d, type:%d"
,
CTG_DEFAULT_RENT_SLOT_SIZE
,
id
,
widx
,
mgmt
->
type
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
}
}
if
(
NULL
==
taosArrayPush
(
slot
->
meta
,
meta
))
{
if
(
NULL
==
taosArrayPush
(
slot
->
meta
,
meta
))
{
qError
(
"taosArrayPush meta to rent failed, id:%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
qError
(
"taosArrayPush meta to rent failed, id:
0x
%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
slot
->
needSort
=
true
;
slot
->
needSort
=
true
;
qDebug
(
"add meta to rent, id:%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
qDebug
(
"add meta to rent, id:
0x
%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
_return:
_return:
...
@@ -1047,7 +1079,7 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
...
@@ -1047,7 +1079,7 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
CTG_LOCK
(
CTG_WRITE
,
&
slot
->
lock
);
CTG_LOCK
(
CTG_WRITE
,
&
slot
->
lock
);
if
(
NULL
==
slot
->
meta
)
{
if
(
NULL
==
slot
->
meta
)
{
qError
(
"empty meta slot, id:%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
qError
(
"empty meta slot, id:
0x
%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
...
@@ -1060,20 +1092,20 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
...
@@ -1060,20 +1092,20 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
void
*
orig
=
taosArraySearch
(
slot
->
meta
,
&
id
,
searchCompare
,
TD_EQ
);
void
*
orig
=
taosArraySearch
(
slot
->
meta
,
&
id
,
searchCompare
,
TD_EQ
);
if
(
NULL
==
orig
)
{
if
(
NULL
==
orig
)
{
qDebug
(
"meta not found in slot, id:%"
PRIx64
", slot idx:%d, type:%d, size:%d"
,
id
,
widx
,
mgmt
->
type
,
(
int32_t
)
taosArrayGetSize
(
slot
->
meta
));
qDebug
(
"meta not found in slot, id:
0x
%"
PRIx64
", slot idx:%d, type:%d, size:%d"
,
id
,
widx
,
mgmt
->
type
,
(
int32_t
)
taosArrayGetSize
(
slot
->
meta
));
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
memcpy
(
orig
,
meta
,
size
);
memcpy
(
orig
,
meta
,
size
);
qDebug
(
"meta in rent updated, id:%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
qDebug
(
"meta in rent updated, id:
0x
%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
_return:
_return:
CTG_UNLOCK
(
CTG_WRITE
,
&
slot
->
lock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
slot
->
lock
);
if
(
code
)
{
if
(
code
)
{
qDebug
(
"meta in rent update failed, will try to add it, code:%x, id:%"
PRIx64
", slot idx:%d, type:%d"
,
code
,
id
,
widx
,
mgmt
->
type
);
qDebug
(
"meta in rent update failed, will try to add it, code:%x, id:
0x
%"
PRIx64
", slot idx:%d, type:%d"
,
code
,
id
,
widx
,
mgmt
->
type
);
CTG_RET
(
ctgMetaRentAdd
(
mgmt
,
meta
,
id
,
size
));
CTG_RET
(
ctgMetaRentAdd
(
mgmt
,
meta
,
id
,
size
));
}
}
...
@@ -1088,7 +1120,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
...
@@ -1088,7 +1120,7 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
CTG_LOCK
(
CTG_WRITE
,
&
slot
->
lock
);
CTG_LOCK
(
CTG_WRITE
,
&
slot
->
lock
);
if
(
NULL
==
slot
->
meta
)
{
if
(
NULL
==
slot
->
meta
)
{
qError
(
"empty meta slot, id:%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
qError
(
"empty meta slot, id:
0x
%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
...
@@ -1100,13 +1132,13 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
...
@@ -1100,13 +1132,13 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
int32_t
idx
=
taosArraySearchIdx
(
slot
->
meta
,
&
id
,
searchCompare
,
TD_EQ
);
int32_t
idx
=
taosArraySearchIdx
(
slot
->
meta
,
&
id
,
searchCompare
,
TD_EQ
);
if
(
idx
<
0
)
{
if
(
idx
<
0
)
{
qError
(
"meta not found in slot, id:%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
qError
(
"meta not found in slot, id:
0x
%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
taosArrayRemove
(
slot
->
meta
,
idx
);
taosArrayRemove
(
slot
->
meta
,
idx
);
qDebug
(
"meta in rent removed, id:%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
qDebug
(
"meta in rent removed, id:
0x
%"
PRIx64
", slot idx:%d, type:%d"
,
id
,
widx
,
mgmt
->
type
);
_return:
_return:
...
@@ -1219,11 +1251,11 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
...
@@ -1219,11 +1251,11 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
SDbVgVersion
vgVersion
=
{.
dbId
=
newDBCache
.
dbId
,
.
vgVersion
=
-
1
};
SDbVgVersion
vgVersion
=
{.
dbId
=
newDBCache
.
dbId
,
.
vgVersion
=
-
1
};
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
ctgDebug
(
"db added to cache, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbId
);
ctgDebug
(
"db added to cache, dbFName:%s, dbId:
0x
%"
PRIx64
,
dbFName
,
dbId
);
CTG_ERR_RET
(
ctgMetaRentAdd
(
&
pCtg
->
dbRent
,
&
vgVersion
,
dbId
,
sizeof
(
SDbVgVersion
)));
CTG_ERR_RET
(
ctgMetaRentAdd
(
&
pCtg
->
dbRent
,
&
vgVersion
,
dbId
,
sizeof
(
SDbVgVersion
)));
ctgDebug
(
"db added to rent, dbFName:%s, vgVersion:%d, dbId:%"
PRIx64
,
dbFName
,
vgVersion
.
vgVersion
,
dbId
);
ctgDebug
(
"db added to rent, dbFName:%s, vgVersion:%d, dbId:
0x
%"
PRIx64
,
dbFName
,
vgVersion
.
vgVersion
,
dbId
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -1246,7 +1278,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) {
...
@@ -1246,7 +1278,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) {
suid
=
taosHashGetKey
(
pIter
,
NULL
);
suid
=
taosHashGetKey
(
pIter
,
NULL
);
if
(
TSDB_CODE_SUCCESS
==
ctgMetaRentRemove
(
&
pCtg
->
stbRent
,
*
suid
,
ctgStbVersionSortCompare
,
ctgStbVersionSearchCompare
))
{
if
(
TSDB_CODE_SUCCESS
==
ctgMetaRentRemove
(
&
pCtg
->
stbRent
,
*
suid
,
ctgStbVersionSortCompare
,
ctgStbVersionSearchCompare
))
{
ctgDebug
(
"stb removed from rent, suid:%"
PRIx64
,
*
suid
);
ctgDebug
(
"stb removed from rent, suid:
0x
%"
PRIx64
,
*
suid
);
}
}
pIter
=
taosHashIterate
(
dbCache
->
stbCache
,
pIter
);
pIter
=
taosHashIterate
(
dbCache
->
stbCache
,
pIter
);
...
@@ -1257,7 +1289,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) {
...
@@ -1257,7 +1289,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) {
int32_t
ctgRemoveDBFromCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
const
char
*
dbFName
)
{
int32_t
ctgRemoveDBFromCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
const
char
*
dbFName
)
{
uint64_t
dbId
=
dbCache
->
dbId
;
uint64_t
dbId
=
dbCache
->
dbId
;
ctgInfo
(
"start to remove db from cache, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbCache
->
dbId
);
ctgInfo
(
"start to remove db from cache, dbFName:%s, dbId:
0x
%"
PRIx64
,
dbFName
,
dbCache
->
dbId
);
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
dbLock
);
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
dbLock
);
...
@@ -1268,7 +1300,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
...
@@ -1268,7 +1300,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
dbLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
dbLock
);
CTG_ERR_RET
(
ctgMetaRentRemove
(
&
pCtg
->
dbRent
,
dbId
,
ctgDbVgVersionSortCompare
,
ctgDbVgVersionSearchCompare
));
CTG_ERR_RET
(
ctgMetaRentRemove
(
&
pCtg
->
dbRent
,
dbId
,
ctgDbVgVersionSortCompare
,
ctgDbVgVersionSearchCompare
));
ctgDebug
(
"db removed from rent, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbId
);
ctgDebug
(
"db removed from rent, dbFName:%s, dbId:
0x
%"
PRIx64
,
dbFName
,
dbId
);
if
(
taosHashRemove
(
pCtg
->
dbCache
,
dbFName
,
strlen
(
dbFName
)))
{
if
(
taosHashRemove
(
pCtg
->
dbCache
,
dbFName
,
strlen
(
dbFName
)))
{
ctgInfo
(
"taosHashRemove from dbCache failed, may be removed, dbFName:%s"
,
dbFName
);
ctgInfo
(
"taosHashRemove from dbCache failed, may be removed, dbFName:%s"
,
dbFName
);
...
@@ -1276,7 +1308,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
...
@@ -1276,7 +1308,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
}
}
CTG_CACHE_STAT_DEC
(
dbNum
,
1
);
CTG_CACHE_STAT_DEC
(
dbNum
,
1
);
ctgInfo
(
"db removed from cache, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbId
);
ctgInfo
(
"db removed from cache, dbFName:%s, dbId:
0x
%"
PRIx64
,
dbFName
,
dbId
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -1339,7 +1371,7 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uin
...
@@ -1339,7 +1371,7 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uin
CTG_ERR_RET
(
ctgMetaRentUpdate
(
&
pCtg
->
stbRent
,
&
metaRent
,
metaRent
.
suid
,
sizeof
(
SSTableVersion
),
ctgStbVersionSortCompare
,
ctgStbVersionSearchCompare
));
CTG_ERR_RET
(
ctgMetaRentUpdate
(
&
pCtg
->
stbRent
,
&
metaRent
,
metaRent
.
suid
,
sizeof
(
SSTableVersion
),
ctgStbVersionSortCompare
,
ctgStbVersionSearchCompare
));
ctgDebug
(
"db %s,
%"
PRIx64
" stb %s,
%"
PRIx64
" sver %d tver %d smaVer %d updated to stbRent"
,
ctgDebug
(
"db %s,
0x%"
PRIx64
" stb %s,0x
%"
PRIx64
" sver %d tver %d smaVer %d updated to stbRent"
,
dbFName
,
dbId
,
tbName
,
suid
,
metaRent
.
sversion
,
metaRent
.
tversion
,
metaRent
.
smaVer
);
dbFName
,
dbId
,
tbName
,
suid
,
metaRent
.
sversion
,
metaRent
.
tversion
,
metaRent
.
smaVer
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -1349,7 +1381,7 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uin
...
@@ -1349,7 +1381,7 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uin
int32_t
ctgWriteTbMetaToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
char
*
dbFName
,
uint64_t
dbId
,
char
*
tbName
,
STableMeta
*
meta
,
int32_t
metaSize
)
{
int32_t
ctgWriteTbMetaToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
char
*
dbFName
,
uint64_t
dbId
,
char
*
tbName
,
STableMeta
*
meta
,
int32_t
metaSize
)
{
if
(
NULL
==
dbCache
->
tbCache
||
NULL
==
dbCache
->
stbCache
)
{
if
(
NULL
==
dbCache
->
tbCache
||
NULL
==
dbCache
->
stbCache
)
{
taosMemoryFree
(
meta
);
taosMemoryFree
(
meta
);
ctgError
(
"db is dropping, dbId:%"
PRIx64
,
dbCache
->
dbId
);
ctgError
(
"db is dropping, dbId:
0x
%"
PRIx64
,
dbCache
->
dbId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
}
...
@@ -1370,10 +1402,10 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
...
@@ -1370,10 +1402,10 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
if
(
origType
==
TSDB_SUPER_TABLE
)
{
if
(
origType
==
TSDB_SUPER_TABLE
)
{
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
ctgError
(
"stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
ctgError
(
"stb not exist in stbCache, dbFName:%s, stb:%s, suid:
0x
%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
}
else
{
}
else
{
CTG_CACHE_STAT_DEC
(
stblNum
,
1
);
CTG_CACHE_STAT_DEC
(
stblNum
,
1
);
ctgDebug
(
"stb removed from stbCache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
ctgDebug
(
"stb removed from stbCache, dbFName:%s, stb:%s, suid:
0x
%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
}
}
origSuid
=
orig
->
suid
;
origSuid
=
orig
->
suid
;
...
@@ -1407,13 +1439,13 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
...
@@ -1407,13 +1439,13 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
}
}
if
(
origSuid
!=
meta
->
suid
&&
taosHashPut
(
dbCache
->
stbCache
,
&
meta
->
suid
,
sizeof
(
meta
->
suid
),
tbName
,
strlen
(
tbName
)
+
1
)
!=
0
)
{
if
(
origSuid
!=
meta
->
suid
&&
taosHashPut
(
dbCache
->
stbCache
,
&
meta
->
suid
,
sizeof
(
meta
->
suid
),
tbName
,
strlen
(
tbName
)
+
1
)
!=
0
)
{
ctgError
(
"taosHashPut to stable cache failed, suid:%"
PRIx64
,
meta
->
suid
);
ctgError
(
"taosHashPut to stable cache failed, suid:
0x
%"
PRIx64
,
meta
->
suid
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
CTG_CACHE_STAT_INC
(
stblNum
,
1
);
CTG_CACHE_STAT_INC
(
stblNum
,
1
);
ctgDebug
(
"stb %"
PRIx64
" updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
meta
->
suid
,
dbFName
,
tbName
,
meta
->
tableType
);
ctgDebug
(
"stb
0x
%"
PRIx64
" updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
meta
->
suid
,
dbFName
,
tbName
,
meta
->
tableType
);
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbId
,
meta
->
suid
,
pCache
));
CTG_ERR_RET
(
ctgUpdateRentStbVersion
(
pCtg
,
dbFName
,
tbName
,
dbId
,
meta
->
suid
,
pCache
));
...
@@ -1424,7 +1456,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa
...
@@ -1424,7 +1456,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa
if
(
NULL
==
dbCache
->
tbCache
)
{
if
(
NULL
==
dbCache
->
tbCache
)
{
ctgFreeSTableIndex
(
*
index
);
ctgFreeSTableIndex
(
*
index
);
taosMemoryFreeClear
(
*
index
);
taosMemoryFreeClear
(
*
index
);
ctgError
(
"db is dropping, dbId:%"
PRIx64
,
dbCache
->
dbId
);
ctgError
(
"db is dropping, dbId:
0x
%"
PRIx64
,
dbCache
->
dbId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
}
...
@@ -1510,7 +1542,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
...
@@ -1510,7 +1542,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
CTG_ERR_RET
(
ctgGetAddDBCache
(
msg
->
pCtg
,
dbFName
,
msg
->
dbId
,
&
dbCache
));
CTG_ERR_RET
(
ctgGetAddDBCache
(
msg
->
pCtg
,
dbFName
,
msg
->
dbId
,
&
dbCache
));
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
ctgInfo
(
"conflict db update, ignore this update, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
msg
->
dbId
);
ctgInfo
(
"conflict db update, ignore this update, dbFName:%s, dbId:
0x
%"
PRIx64
,
dbFName
,
msg
->
dbId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
...
@@ -1540,7 +1572,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
...
@@ -1540,7 +1572,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
vgCache
->
vgInfo
=
dbInfo
;
vgCache
->
vgInfo
=
dbInfo
;
msg
->
dbInfo
=
NULL
;
msg
->
dbInfo
=
NULL
;
ctgDebug
(
"db vgInfo updated, dbFName:%s, vgVer:%d, dbId:%"
PRIx64
,
dbFName
,
vgVersion
.
vgVersion
,
vgVersion
.
dbId
);
ctgDebug
(
"db vgInfo updated, dbFName:%s, vgVer:%d, dbId:
0x
%"
PRIx64
,
dbFName
,
vgVersion
.
vgVersion
,
vgVersion
.
dbId
);
ctgWUnlockVgInfo
(
dbCache
);
ctgWUnlockVgInfo
(
dbCache
);
...
@@ -1569,7 +1601,7 @@ int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
...
@@ -1569,7 +1601,7 @@ int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
}
}
if
(
dbCache
->
dbId
!=
msg
->
dbId
)
{
if
(
dbCache
->
dbId
!=
msg
->
dbId
)
{
ctgInfo
(
"dbId already updated, dbFName:%s, dbId:
%"
PRIx64
", targetId:
%"
PRIx64
,
msg
->
dbFName
,
dbCache
->
dbId
,
msg
->
dbId
);
ctgInfo
(
"dbId already updated, dbFName:%s, dbId:
0x%"
PRIx64
", targetId:0x
%"
PRIx64
,
msg
->
dbFName
,
dbCache
->
dbId
,
msg
->
dbId
);
goto
_return
;
goto
_return
;
}
}
...
@@ -1629,7 +1661,7 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
...
@@ -1629,7 +1661,7 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
CTG_ERR_JRET
(
ctgGetAddDBCache
(
pCtg
,
pMeta
->
dbFName
,
pMeta
->
dbId
,
&
dbCache
));
CTG_ERR_JRET
(
ctgGetAddDBCache
(
pCtg
,
pMeta
->
dbFName
,
pMeta
->
dbId
,
&
dbCache
));
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
ctgInfo
(
"conflict db update, ignore this update, dbFName:%s, dbId:%"
PRIx64
,
pMeta
->
dbFName
,
pMeta
->
dbId
);
ctgInfo
(
"conflict db update, ignore this update, dbFName:%s, dbId:
0x
%"
PRIx64
,
pMeta
->
dbFName
,
pMeta
->
dbId
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
...
@@ -1673,27 +1705,28 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
...
@@ -1673,27 +1705,28 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
}
}
if
(
msg
->
dbId
&&
(
dbCache
->
dbId
!=
msg
->
dbId
))
{
if
(
msg
->
dbId
&&
(
dbCache
->
dbId
!=
msg
->
dbId
))
{
ctgDebug
(
"dbId already modified, dbFName:%s, current:%"
PRIx64
", dbId:%"
PRIx64
", stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
dbCache
->
dbId
,
msg
->
dbId
,
msg
->
stbName
,
msg
->
suid
);
ctgDebug
(
"dbId already modified, dbFName:%s, current:0x%"
PRIx64
", dbId:0x%"
PRIx64
", stb:%s, suid:0x%"
PRIx64
,
msg
->
dbFName
,
dbCache
->
dbId
,
msg
->
dbId
,
msg
->
stbName
,
msg
->
suid
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
)))
{
if
(
taosHashRemove
(
dbCache
->
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
)))
{
ctgDebug
(
"stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
ctgDebug
(
"stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:
0x
%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
else
{
}
else
{
CTG_CACHE_STAT_DEC
(
stblNum
,
1
);
CTG_CACHE_STAT_DEC
(
stblNum
,
1
);
}
}
if
(
taosHashRemove
(
dbCache
->
tbCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
)))
{
if
(
taosHashRemove
(
dbCache
->
tbCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
)))
{
ctgError
(
"stb not exist in cache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
ctgError
(
"stb not exist in cache, dbFName:%s, stb:%s, suid:
0x
%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
else
{
}
else
{
CTG_CACHE_STAT_DEC
(
tblNum
,
1
);
CTG_CACHE_STAT_DEC
(
tblNum
,
1
);
}
}
ctgInfo
(
"stb removed from cache, dbFName:%s, stbName:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
ctgInfo
(
"stb removed from cache, dbFName:%s, stbName:%s, suid:
0x
%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
CTG_ERR_JRET
(
ctgMetaRentRemove
(
&
msg
->
pCtg
->
stbRent
,
msg
->
suid
,
ctgStbVersionSortCompare
,
ctgStbVersionSearchCompare
));
CTG_ERR_JRET
(
ctgMetaRentRemove
(
&
msg
->
pCtg
->
stbRent
,
msg
->
suid
,
ctgStbVersionSortCompare
,
ctgStbVersionSearchCompare
));
ctgDebug
(
"stb removed from rent, dbFName:%s, stbName:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
ctgDebug
(
"stb removed from rent, dbFName:%s, stbName:%s, suid:
0x
%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
_return:
_return:
...
@@ -1714,7 +1747,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
...
@@ -1714,7 +1747,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
}
}
if
(
dbCache
->
dbId
!=
msg
->
dbId
)
{
if
(
dbCache
->
dbId
!=
msg
->
dbId
)
{
ctgDebug
(
"dbId
%"
PRIx64
" not match with curId
%"
PRIx64
", dbFName:%s, tbName:%s"
,
msg
->
dbId
,
dbCache
->
dbId
,
msg
->
dbFName
,
msg
->
tbName
);
ctgDebug
(
"dbId
0x%"
PRIx64
" not match with curId 0x
%"
PRIx64
", dbFName:%s, tbName:%s"
,
msg
->
dbId
,
dbCache
->
dbId
,
msg
->
dbFName
,
msg
->
tbName
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -1898,6 +1931,37 @@ _return:
...
@@ -1898,6 +1931,37 @@ _return:
}
}
int32_t
ctgOpClearCache
(
SCtgCacheOperation
*
operation
)
{
int32_t
code
=
0
;
SCtgClearCacheMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
if
(
pCtg
)
{
catalogFreeHandle
(
pCtg
);
goto
_return
;
}
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
while
(
pIter
)
{
pCtg
=
*
(
SCatalog
**
)
pIter
;
if
(
pCtg
)
{
catalogFreeHandle
(
pCtg
);
}
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
pIter
);
}
taosHashClear
(
gCtgMgmt
.
pCluster
);
_return:
taosMemoryFreeClear
(
msg
);
CTG_RET
(
code
);
}
void
ctgUpdateThreadUnexpectedStopped
(
void
)
{
void
ctgUpdateThreadUnexpectedStopped
(
void
)
{
if
(
!
atomic_load_8
((
int8_t
*
)
&
gCtgMgmt
.
exit
)
&&
CTG_IS_LOCKED
(
&
gCtgMgmt
.
lock
)
>
0
)
CTG_UNLOCK
(
CTG_READ
,
&
gCtgMgmt
.
lock
);
if
(
!
atomic_load_8
((
int8_t
*
)
&
gCtgMgmt
.
exit
)
&&
CTG_IS_LOCKED
(
&
gCtgMgmt
.
lock
)
>
0
)
CTG_UNLOCK
(
CTG_READ
,
&
gCtgMgmt
.
lock
);
}
}
...
@@ -1969,6 +2033,7 @@ void* ctgUpdateThreadFunc(void* param) {
...
@@ -1969,6 +2033,7 @@ void* ctgUpdateThreadFunc(void* param) {
CTG_RT_STAT_INC
(
qDoneNum
,
1
);
CTG_RT_STAT_INC
(
qDoneNum
,
1
);
ctgdShowCacheInfo
();
ctgdShowClusterCache
(
pCtg
);
ctgdShowClusterCache
(
pCtg
);
}
}
...
...
source/libs/catalog/src/ctgDbg.c
浏览文件 @
d3203263
...
@@ -19,7 +19,7 @@
...
@@ -19,7 +19,7 @@
#include "catalogInt.h"
#include "catalogInt.h"
extern
SCatalogMgmt
gCtgMgmt
;
extern
SCatalogMgmt
gCtgMgmt
;
SCtgDebug
gCTGDebug
=
{.
lockEnable
=
true
,
.
api
Enable
=
true
};
SCtgDebug
gCTGDebug
=
{.
cache
Enable
=
true
};
void
ctgdUserCallback
(
SMetaData
*
pResult
,
void
*
param
,
int32_t
code
)
{
void
ctgdUserCallback
(
SMetaData
*
pResult
,
void
*
param
,
int32_t
code
)
{
ASSERT
(
*
(
int32_t
*
)
param
==
1
);
ASSERT
(
*
(
int32_t
*
)
param
==
1
);
...
@@ -40,9 +40,9 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
...
@@ -40,9 +40,9 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
STableComInfo
*
c
=
&
p
->
tableInfo
;
STableComInfo
*
c
=
&
p
->
tableInfo
;
if
(
TSDB_CHILD_TABLE
==
p
->
tableType
)
{
if
(
TSDB_CHILD_TABLE
==
p
->
tableType
)
{
qDebug
(
"table meta: type:%d, vgId:%d, uid:
%"
PRIx64
",suid:
%"
PRIx64
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
qDebug
(
"table meta: type:%d, vgId:%d, uid:
0x%"
PRIx64
",suid:0x
%"
PRIx64
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
}
else
{
}
else
{
qDebug
(
"table meta: type:%d, vgId:%d, uid:
%"
PRIx64
",suid:
%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
qDebug
(
"table meta: type:%d, vgId:%d, uid:
0x%"
PRIx64
",suid:0x
%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
}
}
...
@@ -75,7 +75,7 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
...
@@ -75,7 +75,7 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
num
=
taosArrayGetSize
(
pResult
->
pDbInfo
);
num
=
taosArrayGetSize
(
pResult
->
pDbInfo
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SDbInfo
*
pDb
=
taosArrayGet
(
pResult
->
pDbInfo
,
i
);
SDbInfo
*
pDb
=
taosArrayGet
(
pResult
->
pDbInfo
,
i
);
qDebug
(
"db %d dbInfo: vgVer:%d, tbNum:%d, dbId:%"
PRIx64
,
i
,
pDb
->
vgVer
,
pDb
->
tbNum
,
pDb
->
dbId
);
qDebug
(
"db %d dbInfo: vgVer:%d, tbNum:%d, dbId:
0x
%"
PRIx64
,
i
,
pDb
->
vgVer
,
pDb
->
tbNum
,
pDb
->
dbId
);
}
}
}
else
{
}
else
{
qDebug
(
"empty db info"
);
qDebug
(
"empty db info"
);
...
@@ -333,10 +333,10 @@ void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
...
@@ -333,10 +333,10 @@ void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
STableComInfo
*
c
=
&
p
->
tableInfo
;
STableComInfo
*
c
=
&
p
->
tableInfo
;
if
(
TSDB_CHILD_TABLE
==
p
->
tableType
)
{
if
(
TSDB_CHILD_TABLE
==
p
->
tableType
)
{
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:
%"
PRIx64
",suid:
%"
PRIx64
,
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:
0x%"
PRIx64
",suid:0x
%"
PRIx64
,
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
);
return
;
return
;
}
else
{
}
else
{
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:
%"
PRIx64
",suid:
%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
ctgDebug
(
"table [%s] meta: type:%d, vgId:%d, uid:
0x%"
PRIx64
",suid:0x
%"
PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d"
,
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
tbName
,
p
->
tableType
,
p
->
vgId
,
p
->
uid
,
p
->
suid
,
p
->
sversion
,
p
->
tversion
,
c
->
numOfTags
,
c
->
precision
,
c
->
numOfColumns
,
c
->
rowSize
);
}
}
...
@@ -377,7 +377,7 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
...
@@ -377,7 +377,7 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
}
}
}
}
ctgDebug
(
"[%d] db [%.*s][%"
PRIx64
"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, vgNum:%d"
,
ctgDebug
(
"[%d] db [%.*s][
0x
%"
PRIx64
"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, vgNum:%d"
,
i
,
(
int32_t
)
len
,
dbFName
,
dbCache
->
dbId
,
dbCache
->
deleted
?
"deleted"
:
""
,
metaNum
,
stbNum
,
vgVersion
,
hashMethod
,
vgNum
);
i
,
(
int32_t
)
len
,
dbFName
,
dbCache
->
dbId
,
dbCache
->
deleted
?
"deleted"
:
""
,
metaNum
,
stbNum
,
vgVersion
,
hashMethod
,
vgNum
);
pIter
=
taosHashIterate
(
dbHash
,
pIter
);
pIter
=
taosHashIterate
(
dbHash
,
pIter
);
...
@@ -392,13 +392,13 @@ void ctgdShowClusterCache(SCatalog* pCtg) {
...
@@ -392,13 +392,13 @@ void ctgdShowClusterCache(SCatalog* pCtg) {
return
;
return
;
}
}
ctgDebug
(
"## cluster %"
PRIx64
" %p cache Info BEGIN ##"
,
pCtg
->
clusterId
,
pCtg
);
ctgDebug
(
"## cluster
0x
%"
PRIx64
" %p cache Info BEGIN ##"
,
pCtg
->
clusterId
,
pCtg
);
ctgDebug
(
"db:%d meta:%d stb:%d dbRent:%d stbRent:%d"
,
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_META_NUM
),
ctgDebug
(
"db:%d meta:%d stb:%d dbRent:%d stbRent:%d"
,
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_META_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_RENT_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_RENT_NUM
));
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_DB_RENT_NUM
),
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_STB_RENT_NUM
));
ctgdShowDBCache
(
pCtg
,
pCtg
->
dbCache
);
ctgdShowDBCache
(
pCtg
,
pCtg
->
dbCache
);
ctgDebug
(
"## cluster %"
PRIx64
" %p cache Info END ##"
,
pCtg
->
clusterId
,
pCtg
);
ctgDebug
(
"## cluster
0x
%"
PRIx64
" %p cache Info END ##"
,
pCtg
->
clusterId
,
pCtg
);
}
}
int32_t
ctgdShowCacheInfo
(
void
)
{
int32_t
ctgdShowCacheInfo
(
void
)
{
...
@@ -408,6 +408,8 @@ int32_t ctgdShowCacheInfo(void) {
...
@@ -408,6 +408,8 @@ int32_t ctgdShowCacheInfo(void) {
CTG_API_ENTER
();
CTG_API_ENTER
();
qDebug
(
"# total catalog cluster number %d #"
,
taosHashGetSize
(
gCtgMgmt
.
pCluster
));
SCatalog
*
pCtg
=
NULL
;
SCatalog
*
pCtg
=
NULL
;
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
while
(
pIter
)
{
while
(
pIter
)
{
...
...
source/libs/catalog/src/ctgRemote.c
浏览文件 @
d3203263
...
@@ -186,13 +186,13 @@ int32_t ctgHandleMsgCallback(void *param, const SDataBuf *pMsg, int32_t rspCode)
...
@@ -186,13 +186,13 @@ int32_t ctgHandleMsgCallback(void *param, const SDataBuf *pMsg, int32_t rspCode)
SCtgJob
*
pJob
=
taosAcquireRef
(
gCtgMgmt
.
jobPool
,
cbParam
->
refId
);
SCtgJob
*
pJob
=
taosAcquireRef
(
gCtgMgmt
.
jobPool
,
cbParam
->
refId
);
if
(
NULL
==
pJob
)
{
if
(
NULL
==
pJob
)
{
qDebug
(
"
job refId
%"
PRIx64
" already dropped"
,
cbParam
->
refId
);
qDebug
(
"
ctg job refId 0x
%"
PRIx64
" already dropped"
,
cbParam
->
refId
);
goto
_return
;
goto
_return
;
}
}
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
cbParam
->
taskId
);
SCtgTask
*
pTask
=
taosArrayGet
(
pJob
->
pTasks
,
cbParam
->
taskId
);
qDebug
(
"QID:0x%"
PRIx64
" task %d start to handle rsp %s"
,
pJob
->
queryId
,
pTask
->
taskId
,
TMSG_INFO
(
cbParam
->
reqType
+
1
));
qDebug
(
"QID:0x%"
PRIx64
"
ctg
task %d start to handle rsp %s"
,
pJob
->
queryId
,
pTask
->
taskId
,
TMSG_INFO
(
cbParam
->
reqType
+
1
));
CTG_ERR_JRET
((
*
gCtgAsyncFps
[
pTask
->
type
].
handleRspFp
)(
pTask
,
cbParam
->
reqType
,
pMsg
,
rspCode
));
CTG_ERR_JRET
((
*
gCtgAsyncFps
[
pTask
->
type
].
handleRspFp
)(
pTask
,
cbParam
->
reqType
,
pMsg
,
rspCode
));
...
@@ -263,7 +263,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask
...
@@ -263,7 +263,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask
CTG_ERR_JRET
(
code
);
CTG_ERR_JRET
(
code
);
}
}
ctgDebug
(
"req msg sent, reqId:0x%"
PRIx64
", msg type:%d, %s"
,
pTask
->
pJob
->
queryId
,
msgType
,
TMSG_INFO
(
msgType
));
ctgDebug
(
"
ctg
req msg sent, reqId:0x%"
PRIx64
", msg type:%d, %s"
,
pTask
->
pJob
->
queryId
,
msgType
,
TMSG_INFO
(
msgType
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_return:
_return:
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
d3203263
...
@@ -434,7 +434,7 @@ void ctgFreeJob(void* job) {
...
@@ -434,7 +434,7 @@ void ctgFreeJob(void* job) {
taosMemoryFree
(
job
);
taosMemoryFree
(
job
);
qDebug
(
"QID:
%"
PRIx64
", job
%"
PRIx64
" freed"
,
qid
,
rid
);
qDebug
(
"QID:
0x%"
PRIx64
", ctg job 0x
%"
PRIx64
" freed"
,
qid
,
rid
);
}
}
int32_t
ctgUpdateMsgCtx
(
SCtgMsgCtx
*
pCtx
,
int32_t
reqType
,
void
*
out
,
char
*
target
)
{
int32_t
ctgUpdateMsgCtx
(
SCtgMsgCtx
*
pCtx
,
int32_t
reqType
,
void
*
out
,
char
*
target
)
{
...
...
source/libs/command/src/command.c
浏览文件 @
d3203263
...
@@ -14,6 +14,7 @@
...
@@ -14,6 +14,7 @@
*/
*/
#include "command.h"
#include "command.h"
#include "catalog.h"
#include "tdatablock.h"
#include "tdatablock.h"
static
int32_t
getSchemaBytes
(
const
SSchema
*
pSchema
)
{
static
int32_t
getSchemaBytes
(
const
SSchema
*
pSchema
)
{
...
@@ -120,8 +121,7 @@ static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) {
...
@@ -120,8 +121,7 @@ static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) {
}
}
static
int32_t
execResetQueryCache
()
{
static
int32_t
execResetQueryCache
()
{
// todo
return
catalogClearCache
();
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qExecCommand
(
SNode
*
pStmt
,
SRetrieveTableRsp
**
pRsp
)
{
int32_t
qExecCommand
(
SNode
*
pStmt
,
SRetrieveTableRsp
**
pRsp
)
{
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
d3203263
...
@@ -614,6 +614,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
...
@@ -614,6 +614,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
QW_SET_EVENT_PROCESSED
(
ctx
,
QW_EVENT_FETCH
);
qwBuildAndSendFetchRsp
(
&
qwMsg
->
connInfo
,
rsp
,
dataLen
,
code
);
qwBuildAndSendFetchRsp
(
&
qwMsg
->
connInfo
,
rsp
,
dataLen
,
code
);
rsp
=
NULL
;
QW_TASK_DLOG
(
"fetch rsp send, handle:%p, code:%x - %s, dataLen:%d"
,
qwMsg
->
connInfo
.
handle
,
code
,
QW_TASK_DLOG
(
"fetch rsp send, handle:%p, code:%x - %s, dataLen:%d"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
),
dataLen
);
tstrerror
(
code
),
dataLen
);
}
else
{
}
else
{
...
@@ -633,7 +635,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
...
@@ -633,7 +635,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
rsp
=
NULL
;
rsp
=
NULL
;
qwMsg
->
connInfo
=
ctx
->
dataConnInfo
;
qwMsg
->
connInfo
=
ctx
->
dataConnInfo
;
qwBuildAndSendFetchRsp
(
&
qwMsg
->
connInfo
,
rsp
,
0
,
code
);
qwBuildAndSendFetchRsp
(
&
qwMsg
->
connInfo
,
NULL
,
0
,
code
);
QW_TASK_DLOG
(
"fetch rsp send, handle:%p, code:%x - %s, dataLen:%d"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
),
QW_TASK_DLOG
(
"fetch rsp send, handle:%p, code:%x - %s, dataLen:%d"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
),
0
);
0
);
}
}
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
d3203263
...
@@ -21,9 +21,9 @@
...
@@ -21,9 +21,9 @@
#include "tref.h"
#include "tref.h"
#include "trpc.h"
#include "trpc.h"
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
qDebug
(
"acquire jobId:0x%"
PRIx64
,
refId
);
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
qDebug
(
"
sch
acquire jobId:0x%"
PRIx64
,
refId
);
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
qDebug
(
"release jobId:0x%"
PRIx64
,
refId
);
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
qDebug
(
"
sch
release jobId:0x%"
PRIx64
,
refId
);
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
}
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
pTask
->
plan
=
pPlan
;
pTask
->
plan
=
pPlan
;
...
@@ -47,7 +47,7 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b
...
@@ -47,7 +47,7 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b
int64_t
refId
=
-
1
;
int64_t
refId
=
-
1
;
SSchJob
*
pJob
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchJob
));
SSchJob
*
pJob
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchJob
));
if
(
NULL
==
pJob
)
{
if
(
NULL
==
pJob
)
{
qError
(
"QID:%"
PRIx64
" calloc %d failed"
,
pReq
->
pDag
->
queryId
,
(
int32_t
)
sizeof
(
SSchJob
));
qError
(
"QID:
0x
%"
PRIx64
" calloc %d failed"
,
pReq
->
pDag
->
queryId
,
(
int32_t
)
sizeof
(
SSchJob
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
...
@@ -108,7 +108,7 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b
...
@@ -108,7 +108,7 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b
atomic_add_fetch_32
(
&
schMgmt
.
jobNum
,
1
);
atomic_add_fetch_32
(
&
schMgmt
.
jobNum
,
1
);
if
(
NULL
==
schAcquireJob
(
refId
))
{
if
(
NULL
==
schAcquireJob
(
refId
))
{
SCH_JOB_ELOG
(
"schAcquireJob job failed, refId:%"
PRIx64
,
refId
);
SCH_JOB_ELOG
(
"schAcquireJob job failed, refId:
0x
%"
PRIx64
,
refId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
}
...
@@ -194,7 +194,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
...
@@ -194,7 +194,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
*
pStatus
=
status
;
*
pStatus
=
status
;
}
}
if
(
pJob
->
reqKilled
)
{
if
(
*
pJob
->
reqKilled
)
{
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_DROPPING
);
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_DROPPING
);
schUpdateJobErrCode
(
pJob
,
TSDB_CODE_TSC_QUERY_KILLED
);
schUpdateJobErrCode
(
pJob
,
TSDB_CODE_TSC_QUERY_KILLED
);
...
@@ -229,7 +229,7 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
...
@@ -229,7 +229,7 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break
;
break
;
case
JOB_TASK_STATUS_NOT_START
:
case
JOB_TASK_STATUS_NOT_START
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
}
...
@@ -299,7 +299,7 @@ int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) {
...
@@ -299,7 +299,7 @@ int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) {
int8_t
status
=
0
;
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_JOB_ELOG
(
"
job need to stop cause of status %s"
,
jobTaskStatusStr
(
status
));
SCH_JOB_ELOG
(
"
abort op %s cause of job need to stop"
,
schGetOpStr
(
type
));
SCH_ERR_JRET
(
pJob
->
errCode
);
SCH_ERR_JRET
(
pJob
->
errCode
);
}
}
...
@@ -308,7 +308,7 @@ int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) {
...
@@ -308,7 +308,7 @@ int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) {
SCH_ERR_JRET
(
TSDB_CODE_TSC_APP_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_TSC_APP_ERROR
);
}
}
SCH_JOB_
E
LOG
(
"job start %s operation"
,
schGetOpStr
(
pJob
->
opStatus
.
op
));
SCH_JOB_
D
LOG
(
"job start %s operation"
,
schGetOpStr
(
pJob
->
opStatus
.
op
));
pJob
->
opStatus
.
sync
=
sync
;
pJob
->
opStatus
.
sync
=
sync
;
...
@@ -377,7 +377,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
...
@@ -377,7 +377,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
SCH_TASK_DLOG
(
"children info, the %d child TID %"
PRIx64
,
n
,
(
*
childTask
)
->
taskId
);
SCH_TASK_DLOG
(
"children info, the %d child TID
0x
%"
PRIx64
,
n
,
(
*
childTask
)
->
taskId
);
}
}
if
(
parentNum
>
0
)
{
if
(
parentNum
>
0
)
{
...
@@ -411,7 +411,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
...
@@ -411,7 +411,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
SCH_TASK_DLOG
(
"parents info, the %d parent TID %"
PRIx64
,
n
,
(
*
parentTask
)
->
taskId
);
SCH_TASK_DLOG
(
"parents info, the %d parent TID
0x
%"
PRIx64
,
n
,
(
*
parentTask
)
->
taskId
);
}
}
SCH_TASK_DLOG
(
"level:%d, parentNum:%d, childNum:%d"
,
i
,
parentNum
,
childNum
);
SCH_TASK_DLOG
(
"level:%d, parentNum:%d, childNum:%d"
,
i
,
parentNum
,
childNum
);
...
@@ -981,7 +981,9 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
...
@@ -981,7 +981,9 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
schUpdateJobErrCode
(
pJob
,
errCode
);
schUpdateJobErrCode
(
pJob
,
errCode
);
int32_t
code
=
atomic_load_32
(
&
pJob
->
errCode
);
int32_t
code
=
atomic_load_32
(
&
pJob
->
errCode
);
if
(
code
)
{
SCH_JOB_DLOG
(
"job failed with error: %s"
,
tstrerror
(
code
));
SCH_JOB_DLOG
(
"job failed with error: %s"
,
tstrerror
(
code
));
}
schPostJobRes
(
pJob
,
0
);
schPostJobRes
(
pJob
,
0
);
...
@@ -1174,7 +1176,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
...
@@ -1174,7 +1176,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_UNLOCK
(
SCH_WRITE
,
&
parent
->
lock
);
SCH_UNLOCK
(
SCH_WRITE
,
&
parent
->
lock
);
if
(
SCH_TASK_READY_FOR_LAUNCH
(
readyNum
,
parent
))
{
if
(
SCH_TASK_READY_FOR_LAUNCH
(
readyNum
,
parent
))
{
SCH_TASK_DLOG
(
"all %d children task done, start to launch parent task %"
PRIx64
,
readyNum
,
parent
->
taskId
);
SCH_TASK_DLOG
(
"all %d children task done, start to launch parent task
0x
%"
PRIx64
,
readyNum
,
parent
->
taskId
);
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
parent
));
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
parent
));
}
}
}
}
...
@@ -1347,7 +1349,7 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
...
@@ -1347,7 +1349,7 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
int32_t
schGetTaskInJob
(
SSchJob
*
pJob
,
uint64_t
taskId
,
SSchTask
**
pTask
)
{
int32_t
schGetTaskInJob
(
SSchJob
*
pJob
,
uint64_t
taskId
,
SSchTask
**
pTask
)
{
schGetTaskFromList
(
pJob
->
taskList
,
taskId
,
pTask
);
schGetTaskFromList
(
pJob
->
taskList
,
taskId
,
pTask
);
if
(
NULL
==
*
pTask
)
{
if
(
NULL
==
*
pTask
)
{
SCH_JOB_ELOG
(
"task not found in job task list, taskId:%"
PRIx64
,
taskId
);
SCH_JOB_ELOG
(
"task not found in job task list, taskId:
0x
%"
PRIx64
,
taskId
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
...
@@ -1520,9 +1522,9 @@ void schFreeJobImpl(void *job) {
...
@@ -1520,9 +1522,9 @@ void schFreeJobImpl(void *job) {
taosMemoryFreeClear
(
pJob
->
userRes
.
queryRes
);
taosMemoryFreeClear
(
pJob
->
userRes
.
queryRes
);
taosMemoryFreeClear
(
pJob
->
resData
);
taosMemoryFreeClear
(
pJob
->
resData
);
taosMemoryFree
Clear
(
pJob
);
taosMemoryFree
(
pJob
);
qDebug
(
"QID:0x%"
PRIx64
"
job freed, refId:
%"
PRIx64
", pointer:%p"
,
queryId
,
refId
,
pJob
);
qDebug
(
"QID:0x%"
PRIx64
"
sch job freed, refId:0x
%"
PRIx64
", pointer:%p"
,
queryId
,
refId
,
pJob
);
int32_t
jobNum
=
atomic_sub_fetch_32
(
&
schMgmt
.
jobNum
,
1
);
int32_t
jobNum
=
atomic_sub_fetch_32
(
&
schMgmt
.
jobNum
,
1
);
if
(
jobNum
==
0
)
{
if
(
jobNum
==
0
)
{
...
@@ -1614,7 +1616,7 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
...
@@ -1614,7 +1616,7 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchJob
*
pJob
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchJob
));
SSchJob
*
pJob
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchJob
));
if
(
NULL
==
pJob
)
{
if
(
NULL
==
pJob
)
{
qError
(
"QID:%"
PRIx64
" calloc %d failed"
,
pReq
->
pDag
->
queryId
,
(
int32_t
)
sizeof
(
SSchJob
));
qError
(
"QID:
0x
%"
PRIx64
" calloc %d failed"
,
pReq
->
pDag
->
queryId
,
(
int32_t
)
sizeof
(
SSchJob
));
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
pReq
->
fp
(
NULL
,
pReq
->
cbParam
,
code
);
pReq
->
fp
(
NULL
,
pReq
->
cbParam
,
code
);
SCH_ERR_RET
(
code
);
SCH_ERR_RET
(
code
);
...
@@ -1643,13 +1645,13 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
...
@@ -1643,13 +1645,13 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
}
}
if
(
NULL
==
schAcquireJob
(
refId
))
{
if
(
NULL
==
schAcquireJob
(
refId
))
{
SCH_JOB_ELOG
(
"schAcquireJob job failed, refId:%"
PRIx64
,
refId
);
SCH_JOB_ELOG
(
"schAcquireJob job failed, refId:
0x
%"
PRIx64
,
refId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
}
pJob
->
refId
=
refId
;
pJob
->
refId
=
refId
;
SCH_JOB_DLOG
(
"job refId:%"
PRIx64
,
pJob
->
refId
);
SCH_JOB_DLOG
(
"job refId:
0x
%"
PRIx64
,
pJob
->
refId
);
pJob
->
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
pJob
->
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
d3203263
...
@@ -344,7 +344,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
...
@@ -344,7 +344,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
}
case
TDMT_VND_DROP_TASK_RSP
:
{
case
TDMT_VND_DROP_TASK_RSP
:
{
// SHOULD NEVER REACH HERE
// SHOULD NEVER REACH HERE
SCH_TASK_ELOG
(
"invalid status to handle drop task rsp, refId:%"
PRIx64
,
pJob
->
refId
);
SCH_TASK_ELOG
(
"invalid status to handle drop task rsp, refId:
0x
%"
PRIx64
,
pJob
->
refId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
break
;
break
;
}
}
...
@@ -374,7 +374,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
...
@@ -374,7 +374,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SSchJob
*
pJob
=
schAcquireJob
(
pParam
->
refId
);
SSchJob
*
pJob
=
schAcquireJob
(
pParam
->
refId
);
if
(
NULL
==
pJob
)
{
if
(
NULL
==
pJob
)
{
qWarn
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
"taosAcquireRef job failed, may be dropped, refId:%"
PRIx64
,
qWarn
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
"taosAcquireRef job failed, may be dropped, refId:
0x
%"
PRIx64
,
pParam
->
queryId
,
pParam
->
taskId
,
pParam
->
refId
);
pParam
->
queryId
,
pParam
->
taskId
,
pParam
->
refId
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_JOB_FREED
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_JOB_FREED
);
}
}
...
@@ -443,7 +443,7 @@ int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code
...
@@ -443,7 +443,7 @@ int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code
int32_t
schHandleDropCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
schHandleDropCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SSchTaskCallbackParam
*
pParam
=
(
SSchTaskCallbackParam
*
)
param
;
SSchTaskCallbackParam
*
pParam
=
(
SSchTaskCallbackParam
*
)
param
;
qDebug
(
"QID:
%"
PRIx64
",TID:%"
PRIx64
" drop task rsp received, code:
%x"
,
pParam
->
queryId
,
pParam
->
taskId
,
code
);
qDebug
(
"QID:
0x%"
PRIx64
",TID:0x%"
PRIx64
" drop task rsp received, code:0x
%x"
,
pParam
->
queryId
,
pParam
->
taskId
,
code
);
taosMemoryFreeClear
(
param
);
taosMemoryFreeClear
(
param
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
...
source/libs/scheduler/src/schUtil.c
浏览文件 @
d3203263
...
@@ -200,7 +200,7 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
...
@@ -200,7 +200,7 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
SCH_UNLOCK
(
SCH_WRITE
,
&
hb
->
lock
);
SCH_UNLOCK
(
SCH_WRITE
,
&
hb
->
lock
);
SCH_UNLOCK
(
SCH_READ
,
&
schMgmt
.
hbLock
);
SCH_UNLOCK
(
SCH_READ
,
&
schMgmt
.
hbLock
);
qDebug
(
"hb connection updated, sId:%"
PRIx64
", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p"
,
schMgmt
.
sId
,
qDebug
(
"hb connection updated, sId:
0x
%"
PRIx64
", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p"
,
schMgmt
.
sId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
pTrans
,
trans
->
pHandle
);
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
pTrans
,
trans
->
pHandle
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
d3203263
...
@@ -62,12 +62,14 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
...
@@ -62,12 +62,14 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
SCH_ERR_RET
(
TSDB_CODE_QRY_SYS_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_QRY_SYS_ERROR
);
}
}
qInfo
(
"scheduler %"
PRIx64
" initizlized, maxJob:%u"
,
schMgmt
.
sId
,
schMgmt
.
cfg
.
maxJobNum
);
qInfo
(
"scheduler
0x
%"
PRIx64
" initizlized, maxJob:%u"
,
schMgmt
.
sId
,
schMgmt
.
cfg
.
maxJobNum
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schedulerExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
,
SQueryResult
*
pRes
)
{
int32_t
schedulerExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
,
SQueryResult
*
pRes
)
{
qDebug
(
"scheduler sync exec job start"
);
if
(
NULL
==
pReq
||
NULL
==
pJob
||
NULL
==
pRes
)
{
if
(
NULL
==
pReq
||
NULL
==
pJob
||
NULL
==
pRes
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
...
@@ -76,6 +78,8 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes)
...
@@ -76,6 +78,8 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes)
}
}
int32_t
schedulerAsyncExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
)
{
int32_t
schedulerAsyncExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
)
{
qDebug
(
"scheduler async exec job start"
);
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
NULL
==
pReq
||
NULL
==
pJob
)
{
if
(
NULL
==
pReq
||
NULL
==
pJob
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
@@ -93,6 +97,8 @@ _return:
...
@@ -93,6 +97,8 @@ _return:
}
}
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
pData
)
{
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
pData
)
{
qDebug
(
"scheduler sync fetch rows start"
);
if
(
NULL
==
pData
)
{
if
(
NULL
==
pData
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
...
@@ -115,6 +121,8 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
...
@@ -115,6 +121,8 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
}
}
void
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchCallback
fp
,
void
*
param
)
{
void
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchCallback
fp
,
void
*
param
)
{
qDebug
(
"scheduler async fetch rows start"
);
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
NULL
==
fp
||
NULL
==
param
)
{
if
(
NULL
==
fp
||
NULL
==
param
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
@@ -146,12 +154,12 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
...
@@ -146,12 +154,12 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchJob
*
pJob
=
schAcquireJob
(
job
);
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
if
(
NULL
==
pJob
)
{
qDebug
(
"acquire job from jobRef list failed, may not started or dropped, refId:%"
PRIx64
,
job
);
qDebug
(
"acquire job from jobRef list failed, may not started or dropped, refId:
0x
%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
}
if
(
pJob
->
status
<
JOB_TASK_STATUS_NOT_START
||
pJob
->
levelNum
<=
0
||
NULL
==
pJob
->
levels
)
{
if
(
pJob
->
status
<
JOB_TASK_STATUS_NOT_START
||
pJob
->
levelNum
<=
0
||
NULL
==
pJob
->
levels
)
{
qDebug
(
"job not initialized or not executable job, refId:%"
PRIx64
,
job
);
qDebug
(
"job not initialized or not executable job, refId:
0x
%"
PRIx64
,
job
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
}
...
@@ -206,14 +214,14 @@ void schedulerFreeJob(int64_t job, int32_t errCode) {
...
@@ -206,14 +214,14 @@ void schedulerFreeJob(int64_t job, int32_t errCode) {
int32_t
code
=
schProcessOnJobDropped
(
pJob
,
errCode
);
int32_t
code
=
schProcessOnJobDropped
(
pJob
,
errCode
);
if
(
TSDB_CODE_SCH_JOB_IS_DROPPING
==
code
)
{
if
(
TSDB_CODE_SCH_JOB_IS_DROPPING
==
code
)
{
SCH_JOB_DLOG
(
"sch job is already dropping, refId:%"
PRIx64
,
job
);
SCH_JOB_DLOG
(
"sch job is already dropping, refId:
0x
%"
PRIx64
,
job
);
return
;
return
;
}
}
SCH_JOB_DLOG
(
"start to remove job from jobRef list, refId:%"
PRIx64
,
job
);
SCH_JOB_DLOG
(
"start to remove job from jobRef list, refId:
0x
%"
PRIx64
,
job
);
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
job
))
{
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
job
))
{
SCH_JOB_ELOG
(
"remove job from job list failed, refId:%"
PRIx64
,
job
);
SCH_JOB_ELOG
(
"remove job from job list failed, refId:
0x
%"
PRIx64
,
job
);
}
}
schReleaseJob
(
job
);
schReleaseJob
(
job
);
...
...
tools/shell/src/shellEngine.c
浏览文件 @
d3203263
...
@@ -855,8 +855,7 @@ void shellGetGrantInfo() {
...
@@ -855,8 +855,7 @@ void shellGetGrantInfo() {
if
(
code
==
TSDB_CODE_OPS_NOT_SUPPORT
)
{
if
(
code
==
TSDB_CODE_OPS_NOT_SUPPORT
)
{
fprintf
(
stdout
,
"Server is Community Edition, %s
\n\n
"
,
sinfo
);
fprintf
(
stdout
,
"Server is Community Edition, %s
\n\n
"
,
sinfo
);
}
else
{
}
else
{
fprintf
(
stderr
,
"Failed to check Server Edition, Reason:0x%04x:%s
\n\n
"
,
taos_errno
(
shell
.
conn
),
fprintf
(
stderr
,
"Failed to check Server Edition, Reason:0x%04x:%s
\n\n
"
,
code
,
taos_errstr
(
tres
));
taos_errstr
(
shell
.
conn
));
}
}
return
;
return
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录