Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
496ae4ca
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
496ae4ca
编写于
5月 31, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
schedule based on qnode's load
上级
5ce7dd2a
变更
45
隐藏空白更改
内联
并排
Showing
45 changed file
with
549 addition
and
181 deletion
+549
-181
include/common/tmsg.h
include/common/tmsg.h
+32
-7
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
include/dnode/qnode/qnode.h
include/dnode/qnode/qnode.h
+0
-14
include/libs/executor/dataSinkMgt.h
include/libs/executor/dataSinkMgt.h
+8
-0
include/libs/monitor/monitor.h
include/libs/monitor/monitor.h
+5
-0
include/libs/qworker/qworker.h
include/libs/qworker/qworker.h
+14
-8
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+4
-1
source/client/src/clientHb.c
source/client/src/clientHb.c
+4
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+63
-13
source/common/src/tmsg.c
source/common/src/tmsg.c
+62
-10
source/dnode/mgmt/mgmt_dnode/inc/dmInt.h
source/dnode/mgmt/mgmt_dnode/inc/dmInt.h
+2
-1
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+2
-0
source/dnode/mgmt/mgmt_dnode/src/dmInt.c
source/dnode/mgmt/mgmt_dnode/src/dmInt.c
+1
-0
source/dnode/mgmt/mgmt_qnode/src/qmHandle.c
source/dnode/mgmt/mgmt_qnode/src/qmHandle.c
+8
-0
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+1
-1
source/dnode/mgmt/node_mgmt/inc/dmMgmt.h
source/dnode/mgmt/node_mgmt/inc/dmMgmt.h
+1
-0
source/dnode/mgmt/node_mgmt/inc/dmNodes.h
source/dnode/mgmt/node_mgmt/inc/dmNodes.h
+1
-0
source/dnode/mgmt/node_mgmt/src/dmEnv.c
source/dnode/mgmt/node_mgmt/src/dmEnv.c
+1
-0
source/dnode/mgmt/node_mgmt/src/dmMonitor.c
source/dnode/mgmt/node_mgmt/src/dmMonitor.c
+14
-0
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+1
-1
source/dnode/mgmt/node_util/inc/dmUtil.h
source/dnode/mgmt/node_util/inc/dmUtil.h
+4
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/inc/mndQnode.h
source/dnode/mnode/impl/inc/mndQnode.h
+6
-0
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+7
-0
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+4
-0
source/dnode/mnode/impl/src/mndQnode.c
source/dnode/mnode/impl/src/mndQnode.c
+40
-27
source/dnode/qnode/src/qnode.c
source/dnode/qnode/src/qnode.c
+17
-5
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+7
-7
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+3
-3
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+19
-19
source/libs/catalog/src/ctgRemote.c
source/libs/catalog/src/ctgRemote.c
+1
-1
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+4
-4
source/libs/executor/inc/dataSinkInt.h
source/libs/executor/inc/dataSinkInt.h
+2
-0
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+23
-1
source/libs/executor/src/dataSinkMgt.c
source/libs/executor/src/dataSinkMgt.c
+14
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-0
source/libs/monitor/src/monMsg.c
source/libs/monitor/src/monMsg.c
+47
-1
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+8
-4
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+1
-1
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+28
-8
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+16
-7
source/libs/qworker/src/qwUtil.c
source/libs/qworker/src/qwUtil.c
+11
-10
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+24
-3
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+29
-21
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+6
-2
未找到文件。
include/common/tmsg.h
浏览文件 @
496ae4ca
...
...
@@ -791,19 +791,24 @@ typedef struct {
int32_t
tSerializeSQnodeListReq
(
void
*
buf
,
int32_t
bufLen
,
SQnodeListReq
*
pReq
);
int32_t
tDeserializeSQnodeListReq
(
void
*
buf
,
int32_t
bufLen
,
SQnodeListReq
*
pReq
);
typedef
struct
SQueryNodeAddr
{
int32_t
nodeId
;
// vgId or qnodeId
SEpSet
epSet
;
}
SQueryNodeAddr
;
typedef
struct
{
SArray
*
addrsList
;
// SArray<SQueryNodeAddr>
SQueryNodeAddr
addr
;
uint64_t
load
;
}
SQueryNodeLoad
;
typedef
struct
{
SArray
*
qnodeList
;
// SArray<SQueryNodeLoad>
}
SQnodeListRsp
;
int32_t
tSerializeSQnodeListRsp
(
void
*
buf
,
int32_t
bufLen
,
SQnodeListRsp
*
pRsp
);
int32_t
tDeserializeSQnodeListRsp
(
void
*
buf
,
int32_t
bufLen
,
SQnodeListRsp
*
pRsp
);
void
tFreeSQnodeListRsp
(
SQnodeListRsp
*
pRsp
);
typedef
struct
SQueryNodeAddr
{
int32_t
nodeId
;
// vgId or qnodeId
SEpSet
epSet
;
}
SQueryNodeAddr
;
typedef
struct
{
SArray
*
pArray
;
// Array of SUseDbRsp
}
SUseDbBatchRsp
;
...
...
@@ -926,6 +931,21 @@ typedef struct {
int32_t
syncState
;
}
SMnodeLoad
;
typedef
struct
{
int32_t
dnodeId
;
int64_t
numOfProcessedQuery
;
int64_t
numOfProcessedCQuery
;
int64_t
numOfProcessedFetch
;
int64_t
numOfProcessedDrop
;
int64_t
numOfProcessedHb
;
int64_t
cacheDataSize
;
int64_t
numOfQueryInQueue
;
int64_t
numOfFetchInQueue
;
int64_t
timeInQueryQueue
;
int64_t
timeInFetchQueue
;
}
SQnodeLoad
;
typedef
struct
{
int32_t
sver
;
// software version
int64_t
dnodeVer
;
// dnode table version in sdb
...
...
@@ -937,6 +957,7 @@ typedef struct {
int32_t
numOfSupportVnodes
;
char
dnodeEp
[
TSDB_EP_LEN
];
SMnodeLoad
mload
;
SQnodeLoad
qload
;
SClusterCfg
clusterCfg
;
SArray
*
pVloads
;
// array of SVnodeLoad
}
SStatusReq
;
...
...
@@ -1937,6 +1958,7 @@ typedef struct {
int8_t
killConnection
;
int8_t
align
[
3
];
SEpSet
epSet
;
SArray
*
pQnodeList
;
}
SQueryHbRspBasic
;
typedef
struct
{
...
...
@@ -2016,7 +2038,10 @@ static FORCE_INLINE void tFreeClientKv(void* pKv) {
static
FORCE_INLINE
void
tFreeClientHbRsp
(
void
*
pRsp
)
{
SClientHbRsp
*
rsp
=
(
SClientHbRsp
*
)
pRsp
;
taosMemoryFreeClear
(
rsp
->
query
);
if
(
rsp
->
query
)
{
taosArrayDestroy
(
rsp
->
query
->
pQnodeList
);
taosMemoryFreeClear
(
rsp
->
query
);
}
if
(
rsp
->
info
)
taosArrayDestroyEx
(
rsp
->
info
,
tFreeClientKv
);
}
...
...
include/common/tmsgdef.h
浏览文件 @
496ae4ca
...
...
@@ -253,6 +253,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MON_BM_INFO
,
"monitor-binfo"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_VM_LOAD
,
"monitor-vload"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_MM_LOAD
,
"monitor-mload"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_QM_LOAD
,
"monitor-qload"
,
NULL
,
NULL
)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
...
...
include/dnode/qnode/qnode.h
浏览文件 @
496ae4ca
...
...
@@ -25,20 +25,6 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef
struct
SQnode
SQnode
;
typedef
struct
{
int64_t
numOfProcessedQuery
;
int64_t
numOfProcessedCQuery
;
int64_t
numOfProcessedFetch
;
int64_t
numOfProcessedDrop
;
int64_t
memSizeInCache
;
int64_t
dataSizeSend
;
int64_t
dataSizeRecv
;
int64_t
numOfQueryInQueue
;
int64_t
numOfFetchInQueue
;
int64_t
waitTimeInQueryQUeue
;
int64_t
waitTimeInFetchQUeue
;
}
SQnodeLoad
;
typedef
struct
{
SMsgCb
msgCb
;
}
SQnodeOpt
;
...
...
include/libs/executor/dataSinkMgt.h
浏览文件 @
496ae4ca
...
...
@@ -32,6 +32,10 @@ extern "C" {
struct
SDataSink
;
struct
SSDataBlock
;
typedef
struct
SDataSinkStat
{
uint64_t
cachedSize
;
}
SDataSinkStat
;
typedef
struct
SDataSinkMgtCfg
{
uint32_t
maxDataBlockNum
;
// todo: this should be numOfRows?
uint32_t
maxDataBlockNumPerQuery
;
...
...
@@ -62,6 +66,8 @@ typedef struct SOutputData {
*/
int32_t
dsCreateDataSinker
(
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
);
int32_t
dsDataSinkGetCacheSize
(
SDataSinkStat
*
pStat
);
/**
* Put the result set returned by the executor into datasinker.
* @param handle
...
...
@@ -88,6 +94,8 @@ void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd);
*/
int32_t
dsGetDataBlock
(
DataSinkHandle
handle
,
SOutputData
*
pOutput
);
int32_t
dsGetCacheSize
(
DataSinkHandle
handle
,
uint64_t
*
pSize
);
/**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
* @param ahandle
...
...
include/libs/monitor/monitor.h
浏览文件 @
496ae4ca
...
...
@@ -171,6 +171,7 @@ void tFreeSMonVmInfo(SMonVmInfo *pInfo);
typedef
struct
{
SMonSysInfo
sys
;
SMonLogs
log
;
SQnodeLoad
load
;
}
SMonQmInfo
;
int32_t
tSerializeSMonQmInfo
(
void
*
buf
,
int32_t
bufLen
,
SMonQmInfo
*
pInfo
);
...
...
@@ -210,6 +211,10 @@ typedef struct {
int32_t
tSerializeSMonMloadInfo
(
void
*
buf
,
int32_t
bufLen
,
SMonMloadInfo
*
pInfo
);
int32_t
tDeserializeSMonMloadInfo
(
void
*
buf
,
int32_t
bufLen
,
SMonMloadInfo
*
pInfo
);
int32_t
tSerializeSQnodeLoad
(
void
*
buf
,
int32_t
bufLen
,
SQnodeLoad
*
pInfo
);
int32_t
tDeserializeSQnodeLoad
(
void
*
buf
,
int32_t
bufLen
,
SQnodeLoad
*
pInfo
);
typedef
struct
{
const
char
*
server
;
uint16_t
port
;
...
...
include/libs/qworker/qworker.h
浏览文件 @
496ae4ca
...
...
@@ -22,7 +22,7 @@ extern "C" {
#include "tmsgcb.h"
#include "trpc.h"
#include "executor.h"
enum
{
NODE_TYPE_VNODE
=
1
,
...
...
@@ -40,13 +40,19 @@ typedef struct SQWorkerCfg {
}
SQWorkerCfg
;
typedef
struct
{
uint64_t
numOfStartTask
;
uint64_t
numOfStopTask
;
uint64_t
numOfRecvedFetch
;
uint64_t
numOfSentHb
;
uint64_t
numOfSentFetch
;
uint64_t
numOfTaskInQueue
;
uint64_t
cacheDataSize
;
uint64_t
queryProcessed
;
uint64_t
cqueryProcessed
;
uint64_t
fetchProcessed
;
uint64_t
dropProcessed
;
uint64_t
hbProcessed
;
uint64_t
numOfQueryInQueue
;
uint64_t
numOfFetchInQueue
;
uint64_t
timeInQueryQueue
;
uint64_t
timeInFetchQueue
;
uint64_t
numOfErrors
;
}
SQWorkerStat
;
...
...
@@ -68,7 +74,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
void
qWorkerDestroy
(
void
**
qWorkerMgmt
);
int
64_t
qWorkerGetWaitTimeInQueue
(
void
*
qWorkerMgmt
,
EQueueType
type
);
int
32_t
qWorkerGetStat
(
SReadHandle
*
handle
,
void
*
qWorkerMgmt
,
SQWorkerStat
*
pStat
);
#ifdef __cplusplus
}
...
...
source/client/inc/clientInt.h
浏览文件 @
496ae4ca
...
...
@@ -119,6 +119,8 @@ typedef struct SHeartBeatInfo {
struct
SAppInstInfo
{
int64_t
numOfConns
;
SCorEpSet
mgmtEp
;
TdThreadMutex
qnodeMutex
;
SArray
*
pQnodeList
;
SInstanceSummary
summary
;
SList
*
pConnList
;
// STscObj linked list
uint64_t
clusterId
;
...
...
@@ -290,7 +292,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
int32_t
parseSql
(
SRequestObj
*
pRequest
,
bool
topicQuery
,
SQuery
**
pQuery
,
SStmtCallback
*
pStmtCb
);
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
);
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
*
pNodeList
);
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
);
...
...
@@ -317,6 +319,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
int32_t
getQueryPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SArray
**
pNodeList
);
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryPlan
*
pDag
,
SArray
*
pNodeList
,
void
**
res
);
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
);
int32_t
updateQnodeList
(
SAppInstInfo
*
pInfo
,
SArray
*
pNodeList
);
#ifdef __cplusplus
}
...
...
source/client/src/clientHb.c
浏览文件 @
496ae4ca
...
...
@@ -160,6 +160,10 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
taos_close
(
pTscObj
);
}
if
(
pRsp
->
query
->
pQnodeList
)
{
updateQnodeList
(
pTscObj
->
pAppInfo
,
pRsp
->
query
->
pQnodeList
);
}
releaseTscObj
(
pRsp
->
connKey
.
tscRid
);
}
}
...
...
source/client/src/clientImpl.c
浏览文件 @
496ae4ca
...
...
@@ -117,7 +117,8 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
SAppInstInfo
*
p
=
NULL
;
if
(
pInst
==
NULL
)
{
p
=
taosMemoryCalloc
(
1
,
sizeof
(
struct
SAppInstInfo
));
p
->
mgmtEp
=
epSet
;
p
->
mgmtEp
=
epSet
;
taosThreadMutexInit
(
&
p
->
qnodeMutex
,
NULL
);
p
->
pTransporter
=
openTransporter
(
user
,
secretEncrypt
,
tsNumOfCores
);
p
->
pAppHbMgr
=
appHbMgrInit
(
p
,
key
);
taosHashPut
(
appInfo
.
pInstMap
,
key
,
strlen
(
key
),
&
p
,
POINTER_BYTES
);
...
...
@@ -228,7 +229,61 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
)
{
int
compareQueryNodeLoad
(
const
void
*
elem1
,
const
void
*
elem2
)
{
SQueryNodeLoad
*
node1
=
(
SQueryNodeLoad
*
)
elem1
;
SQueryNodeLoad
*
node2
=
(
SQueryNodeLoad
*
)
elem2
;
if
(
node1
->
load
<
node2
->
load
)
{
return
-
1
;
}
return
node1
->
load
>
node2
->
load
;
}
int32_t
updateQnodeList
(
SAppInstInfo
*
pInfo
,
SArray
*
pNodeList
)
{
taosThreadMutexLock
(
&
pInfo
->
qnodeMutex
);
if
(
pInfo
->
pQnodeList
)
{
taosArrayDestroy
(
pInfo
->
pQnodeList
);
pInfo
->
pQnodeList
=
NULL
;
}
if
(
pNodeList
)
{
pInfo
->
pQnodeList
=
taosArrayDup
(
pNodeList
);
taosArraySort
(
pInfo
->
pQnodeList
,
compareQueryNodeLoad
);
}
taosThreadMutexUnlock
(
&
pInfo
->
qnodeMutex
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
getQnodeList
(
SRequestObj
*
pRequest
,
SArray
**
pNodeList
)
{
SAppInstInfo
*
pInfo
=
pRequest
->
pTscObj
->
pAppInfo
;
int32_t
code
=
0
;
taosThreadMutexLock
(
&
pInfo
->
qnodeMutex
);
if
(
pInfo
->
pQnodeList
)
{
*
pNodeList
=
taosArrayDup
(
pInfo
->
pQnodeList
);
}
taosThreadMutexUnlock
(
&
pInfo
->
qnodeMutex
);
if
(
NULL
==
*
pNodeList
)
{
SEpSet
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
);
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pNodeList
=
taosArrayInit
(
5
,
sizeof
(
SQueryNodeLoad
));
code
=
catalogGetQnodeList
(
pCatalog
,
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
mgmtEpSet
,
*
pNodeList
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
*
pNodeList
)
{
code
=
updateQnodeList
(
pInfo
,
*
pNodeList
);
}
}
return
code
;
}
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
**
pNodeList
)
{
pRequest
->
type
=
pQuery
->
msgType
;
SPlanContext
cxt
=
{.
queryId
=
pRequest
->
requestId
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
,
...
...
@@ -237,14 +292,10 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.
showRewrite
=
pQuery
->
showRewrite
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
};
SEpSet
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
);
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
catalogGetQnodeList
(
pCatalog
,
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
mgmtEpSet
,
pNodeList
);
}
int32_t
code
=
getQnodeList
(
pRequest
,
pNodeList
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
qCreateQueryPlan
(
&
cxt
,
pPlan
,
pNodeList
);
code
=
qCreateQueryPlan
(
&
cxt
,
pPlan
,
*
pNodeList
);
}
return
code
;
}
...
...
@@ -369,8 +420,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
}
int32_t
getQueryPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SArray
**
pNodeList
)
{
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
return
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
,
*
pNodeList
);
return
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
,
pNodeList
);
}
int32_t
validateSversion
(
SRequestObj
*
pRequest
,
void
*
res
)
{
...
...
@@ -456,8 +506,8 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
code
=
execDdlQuery
(
pRequest
,
pQuery
);
break
;
case
QUERY_EXEC_MODE_SCHEDULE
:
{
SArray
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
))
;
code
=
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
,
pNodeList
);
SArray
*
pNodeList
=
NULL
;
code
=
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
,
&
pNodeList
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
,
pNodeList
,
&
pRes
);
if
(
NULL
!=
pRes
)
{
...
...
source/common/src/tmsg.c
浏览文件 @
496ae4ca
...
...
@@ -147,12 +147,25 @@ int32_t tEncodeSQueryNodeAddr(SEncoder *pEncoder, SQueryNodeAddr *pAddr) {
return
0
;
}
int32_t
tEncodeSQueryNodeLoad
(
SEncoder
*
pEncoder
,
SQueryNodeLoad
*
pLoad
)
{
if
(
tEncodeSQueryNodeAddr
(
pEncoder
,
&
pLoad
->
addr
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
pEncoder
,
pLoad
->
load
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
tDecodeSQueryNodeAddr
(
SDecoder
*
pDecoder
,
SQueryNodeAddr
*
pAddr
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pAddr
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pAddr
->
epSet
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
tDecodeSQueryNodeLoad
(
SDecoder
*
pDecoder
,
SQueryNodeLoad
*
pLoad
)
{
if
(
tDecodeSQueryNodeAddr
(
pDecoder
,
&
pLoad
->
addr
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
pDecoder
,
&
pLoad
->
load
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
taosEncodeSEpSet
(
void
**
buf
,
const
SEpSet
*
pEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pEp
->
inUse
);
...
...
@@ -304,6 +317,12 @@ static int32_t tSerializeSClientHbRsp(SEncoder *pEncoder, const SClientHbRsp *pR
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
query
->
onlineDnodes
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pRsp
->
query
->
killConnection
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pRsp
->
query
->
epSet
)
<
0
)
return
-
1
;
int32_t
num
=
taosArrayGetSize
(
pRsp
->
query
->
pQnodeList
);
if
(
tEncodeI32
(
pEncoder
,
num
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SQueryNodeLoad
*
pLoad
=
taosArrayGet
(
pRsp
->
query
->
pQnodeList
,
i
);
if
(
tEncodeSQueryNodeLoad
(
pEncoder
,
pLoad
)
<
0
)
return
-
1
;
}
}
else
{
if
(
tEncodeI32
(
pEncoder
,
queryNum
)
<
0
)
return
-
1
;
}
...
...
@@ -333,6 +352,15 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp)
if
(
tDecodeI32
(
pDecoder
,
&
pRsp
->
query
->
onlineDnodes
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pRsp
->
query
->
killConnection
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pRsp
->
query
->
epSet
)
<
0
)
return
-
1
;
int32_t
pQnodeNum
=
0
;
if
(
tDecodeI32
(
pDecoder
,
&
pQnodeNum
)
<
0
)
return
-
1
;
if
(
pQnodeNum
>
0
)
{
pRsp
->
query
->
pQnodeList
=
taosArrayInit
(
pQnodeNum
,
sizeof
(
SQueryNodeLoad
));
if
(
NULL
==
pRsp
->
query
->
pQnodeList
)
return
-
1
;
SQueryNodeLoad
load
=
{
0
};
if
(
tDecodeSQueryNodeLoad
(
pDecoder
,
&
load
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
query
->
pQnodeList
,
&
load
);
}
}
int32_t
kvNum
=
0
;
...
...
@@ -898,6 +926,18 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
// mnode loads
if
(
tEncodeI32
(
&
encoder
,
pReq
->
mload
.
syncState
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
qload
.
dnodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
numOfProcessedQuery
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
numOfProcessedCQuery
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
numOfProcessedFetch
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
numOfProcessedDrop
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
numOfProcessedHb
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
cacheDataSize
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
numOfQueryInQueue
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
numOfFetchInQueue
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
timeInQueryQueue
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
qload
.
timeInFetchQueue
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -955,6 +995,18 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
mload
.
syncState
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
qload
.
dnodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
numOfProcessedQuery
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
numOfProcessedCQuery
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
numOfProcessedFetch
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
numOfProcessedDrop
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
numOfProcessedHb
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
cacheDataSize
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
numOfQueryInQueue
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
numOfFetchInQueue
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
timeInQueryQueue
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
qload
.
timeInFetchQueue
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
...
...
@@ -1921,11 +1973,11 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp)
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
int32_t
num
=
taosArrayGetSize
(
pRsp
->
addrs
List
);
int32_t
num
=
taosArrayGetSize
(
pRsp
->
qnode
List
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SQueryNode
Addr
*
addr
=
taosArrayGet
(
pRsp
->
addrs
List
,
i
);
if
(
tEncodeSQueryNode
Addr
(
&
encoder
,
addr
)
<
0
)
return
-
1
;
SQueryNode
Load
*
pLoad
=
taosArrayGet
(
pRsp
->
qnode
List
,
i
);
if
(
tEncodeSQueryNode
Load
(
&
encoder
,
pLoad
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
...
...
@@ -1941,15 +1993,15 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
int32_t
num
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
if
(
NULL
==
pRsp
->
addrs
List
)
{
pRsp
->
addrsList
=
taosArrayInit
(
num
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
pRsp
->
addrs
List
)
return
-
1
;
if
(
NULL
==
pRsp
->
qnode
List
)
{
pRsp
->
qnodeList
=
taosArrayInit
(
num
,
sizeof
(
SQueryNodeLoad
));
if
(
NULL
==
pRsp
->
qnode
List
)
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SQueryNode
Addr
addr
=
{
0
};
if
(
tDecodeSQueryNode
Addr
(
&
decoder
,
&
addr
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
addrsList
,
&
addr
);
SQueryNode
Load
load
=
{
0
};
if
(
tDecodeSQueryNode
Load
(
&
decoder
,
&
load
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
qnodeList
,
&
load
);
}
tEndDecode
(
&
decoder
);
...
...
@@ -1957,7 +2009,7 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
return
0
;
}
void
tFreeSQnodeListRsp
(
SQnodeListRsp
*
pRsp
)
{
taosArrayDestroy
(
pRsp
->
addrs
List
);
}
void
tFreeSQnodeListRsp
(
SQnodeListRsp
*
pRsp
)
{
taosArrayDestroy
(
pRsp
->
qnode
List
);
}
int32_t
tSerializeSCompactDbReq
(
void
*
buf
,
int32_t
bufLen
,
SCompactDbReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
...
...
source/dnode/mgmt/mgmt_dnode/inc/dmInt.h
浏览文件 @
496ae4ca
...
...
@@ -35,6 +35,7 @@ typedef struct SDnodeMgmt {
SendMonitorReportFp
sendMonitorReportFp
;
GetVnodeLoadsFp
getVnodeLoadsFp
;
GetMnodeLoadsFp
getMnodeLoadsFp
;
GetQnodeLoadsFp
getQnodeLoadsFp
;
}
SDnodeMgmt
;
// dmHandle.c
...
...
@@ -58,4 +59,4 @@ void dmStopWorker(SDnodeMgmt *pMgmt);
}
#endif
#endif
/*_TD_DND_QNODE_INT_H_*/
\ No newline at end of file
#endif
/*_TD_DND_QNODE_INT_H_*/
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
浏览文件 @
496ae4ca
...
...
@@ -79,6 +79,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
(
*
pMgmt
->
getMnodeLoadsFp
)(
&
minfo
);
req
.
mload
=
minfo
.
load
;
(
*
pMgmt
->
getQnodeLoadsFp
)(
&
req
.
qload
);
int32_t
contLen
=
tSerializeSStatusReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
tSerializeSStatusReq
(
pHead
,
contLen
,
&
req
);
...
...
source/dnode/mgmt/mgmt_dnode/src/dmInt.c
浏览文件 @
496ae4ca
...
...
@@ -48,6 +48,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt
->
sendMonitorReportFp
=
pInput
->
sendMonitorReportFp
;
pMgmt
->
getVnodeLoadsFp
=
pInput
->
getVnodeLoadsFp
;
pMgmt
->
getMnodeLoadsFp
=
pInput
->
getMnodeLoadsFp
;
pMgmt
->
getQnodeLoadsFp
=
pInput
->
getQnodeLoadsFp
;
if
(
dmStartWorker
(
pMgmt
)
!=
0
)
{
return
-
1
;
...
...
source/dnode/mgmt/mgmt_qnode/src/qmHandle.c
浏览文件 @
496ae4ca
...
...
@@ -20,6 +20,14 @@ void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {
SQnodeLoad
qload
=
{
0
};
qndGetLoad
(
pMgmt
->
pQnode
,
&
qload
);
qload
.
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
}
void
qmGetQnodeLoads
(
SQnodeMgmt
*
pMgmt
,
SQnodeLoad
*
pInfo
)
{
qndGetLoad
(
pMgmt
->
pQnode
,
pInfo
);
pInfo
->
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
}
int32_t
qmProcessGetMonitorInfoReq
(
SQnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
496ae4ca
...
...
@@ -104,7 +104,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
dTrace
(
"msg:%p, get from vnode-write queue"
,
pMsg
);
if
(
taosArrayPush
(
pArray
,
&
pMsg
)
==
NULL
)
{
dTrace
(
"msg:%p, failed to p
rocess
since %s"
,
pMsg
,
terrstr
());
dTrace
(
"msg:%p, failed to p
ush to array
since %s"
,
pMsg
,
terrstr
());
vmSendRsp
(
pMsg
,
TSDB_CODE_OUT_OF_MEMORY
);
}
}
...
...
source/dnode/mgmt/node_mgmt/inc/dmMgmt.h
浏览文件 @
496ae4ca
...
...
@@ -168,6 +168,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
void
dmSendMonitorReport
();
void
dmGetVnodeLoads
(
SMonVloadInfo
*
pInfo
);
void
dmGetMnodeLoads
(
SMonMloadInfo
*
pInfo
);
void
dmGetQnodeLoads
(
SQnodeLoad
*
pInfo
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/node_mgmt/inc/dmNodes.h
浏览文件 @
496ae4ca
...
...
@@ -37,6 +37,7 @@ void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
void
vmGetVnodeLoads
(
void
*
pMgmt
,
SMonVloadInfo
*
pInfo
);
void
mmGetMnodeLoads
(
void
*
pMgmt
,
SMonMloadInfo
*
pInfo
);
void
qmGetQnodeLoads
(
void
*
pMgmt
,
SQnodeLoad
*
pInfo
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/node_mgmt/src/dmEnv.c
浏览文件 @
496ae4ca
...
...
@@ -178,6 +178,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.
sendMonitorReportFp
=
dmSendMonitorReport
,
.
getVnodeLoadsFp
=
dmGetVnodeLoads
,
.
getMnodeLoadsFp
=
dmGetMnodeLoads
,
.
getQnodeLoadsFp
=
dmGetQnodeLoads
,
};
opt
.
msgCb
=
dmGetMsgcb
(
pWrapper
->
pDnode
);
...
...
source/dnode/mgmt/node_mgmt/src/dmMonitor.c
浏览文件 @
496ae4ca
...
...
@@ -170,3 +170,17 @@ void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
dmReleaseWrapper
(
pWrapper
);
}
}
void
dmGetQnodeLoads
(
SQnodeLoad
*
pInfo
)
{
SDnode
*
pDnode
=
dmInstance
();
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
QNODE
];
if
(
dmMarkWrapper
(
pWrapper
)
==
0
)
{
if
(
tsMultiProcess
)
{
dmSendLocalRecv
(
pDnode
,
TDMT_MON_QM_LOAD
,
tDeserializeSQnodeLoad
,
pInfo
);
}
else
if
(
pWrapper
->
pMgmt
!=
NULL
)
{
qmGetQnodeLoads
(
pWrapper
->
pMgmt
,
pInfo
);
}
dmReleaseWrapper
(
pWrapper
);
}
}
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
496ae4ca
...
...
@@ -130,7 +130,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
_OVER:
if
(
code
!=
0
)
{
dError
(
"msg:%
p, failed to process since %s"
,
pMsg
,
terrstr
());
dError
(
"msg:%
s, failed to process since %s"
,
TMSG_INFO
(
pRpc
->
msgType
)
,
terrstr
());
if
(
terrno
!=
0
)
code
=
terrno
;
if
(
IsReq
(
pRpc
))
{
...
...
source/dnode/mgmt/node_util/inc/dmUtil.h
浏览文件 @
496ae4ca
...
...
@@ -34,6 +34,7 @@
#include "dnode.h"
#include "mnode.h"
#include "qnode.h"
#include "monitor.h"
#include "sync.h"
#include "wal.h"
...
...
@@ -92,6 +93,7 @@ typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef
void
(
*
SendMonitorReportFp
)();
typedef
void
(
*
GetVnodeLoadsFp
)(
SMonVloadInfo
*
pInfo
);
typedef
void
(
*
GetMnodeLoadsFp
)(
SMonMloadInfo
*
pInfo
);
typedef
void
(
*
GetQnodeLoadsFp
)(
SQnodeLoad
*
pInfo
);
typedef
struct
{
int32_t
dnodeId
;
...
...
@@ -118,6 +120,7 @@ typedef struct {
SendMonitorReportFp
sendMonitorReportFp
;
GetVnodeLoadsFp
getVnodeLoadsFp
;
GetMnodeLoadsFp
getMnodeLoadsFp
;
GetQnodeLoadsFp
getQnodeLoadsFp
;
}
SMgmtInputOpt
;
typedef
struct
{
...
...
@@ -180,4 +183,4 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
}
#endif
#endif
/*_TD_DM_INT_H_*/
\ No newline at end of file
#endif
/*_TD_DM_INT_H_*/
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
496ae4ca
...
...
@@ -216,6 +216,7 @@ typedef struct {
int64_t
createdTime
;
int64_t
updateTime
;
SDnodeObj
*
pDnode
;
SQnodeLoad
load
;
}
SQnodeObj
;
typedef
struct
{
...
...
source/dnode/mnode/impl/inc/mndQnode.h
浏览文件 @
496ae4ca
...
...
@@ -22,9 +22,15 @@
extern
"C"
{
#endif
#define QNODE_LOAD_VALUE(pQnode) (pQnode ? (pQnode->load.numOfQueryInQueue + pQnode->load.numOfFetchInQueue) : 0)
int32_t
mndInitQnode
(
SMnode
*
pMnode
);
void
mndCleanupQnode
(
SMnode
*
pMnode
);
SQnodeObj
*
mndAcquireQnode
(
SMnode
*
pMnode
,
int32_t
qnodeId
);
void
mndReleaseQnode
(
SMnode
*
pMnode
,
SQnodeObj
*
pObj
);
int32_t
mndCreateQnodeList
(
SMnode
*
pMnode
,
SArray
**
pList
,
int32_t
limit
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
496ae4ca
...
...
@@ -17,6 +17,7 @@
#include "mndDnode.h"
#include "mndAuth.h"
#include "mndMnode.h"
#include "mndQnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
...
...
@@ -388,6 +389,12 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mndReleaseMnode
(
pMnode
,
pObj
);
}
SQnodeObj
*
pQnode
=
mndAcquireQnode
(
pMnode
,
statusReq
.
qload
.
dnodeId
);
if
(
pQnode
!=
NULL
)
{
pQnode
->
load
=
statusReq
.
qload
;
mndReleaseQnode
(
pMnode
,
pQnode
);
}
int64_t
curMs
=
taosGetTimestampMs
();
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
bool
dnodeChanged
=
(
statusReq
.
dnodeVer
!=
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_DNODE
));
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
496ae4ca
...
...
@@ -18,6 +18,7 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndQnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndUser.h"
...
...
@@ -382,6 +383,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
rspBasic
->
totalDnodes
=
mndGetDnodeSize
(
pMnode
);
rspBasic
->
onlineDnodes
=
1
;
// TODO
mndGetMnodeEpSet
(
pMnode
,
&
rspBasic
->
epSet
);
mndCreateQnodeList
(
pMnode
,
&
rspBasic
->
pQnodeList
,
-
1
);
mndReleaseConn
(
pMnode
,
pConn
);
hbRsp
.
query
=
rspBasic
;
...
...
source/dnode/mnode/impl/src/mndQnode.c
浏览文件 @
496ae4ca
...
...
@@ -60,7 +60,7 @@ int32_t mndInitQnode(SMnode *pMnode) {
void
mndCleanupQnode
(
SMnode
*
pMnode
)
{}
static
SQnodeObj
*
mndAcquireQnode
(
SMnode
*
pMnode
,
int32_t
qnodeId
)
{
SQnodeObj
*
mndAcquireQnode
(
SMnode
*
pMnode
,
int32_t
qnodeId
)
{
SQnodeObj
*
pObj
=
sdbAcquire
(
pMnode
->
pSdb
,
SDB_QNODE
,
&
qnodeId
);
if
(
pObj
==
NULL
&&
terrno
==
TSDB_CODE_SDB_OBJ_NOT_THERE
)
{
terrno
=
TSDB_CODE_MND_QNODE_NOT_EXIST
;
...
...
@@ -68,7 +68,7 @@ static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) {
return
pObj
;
}
static
void
mndReleaseQnode
(
SMnode
*
pMnode
,
SQnodeObj
*
pObj
)
{
void
mndReleaseQnode
(
SMnode
*
pMnode
,
SQnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pObj
);
}
...
...
@@ -429,49 +429,62 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessQnodeListReq
(
SRpcMsg
*
pReq
)
{
int32_t
code
=
-
1
;
int32_t
numOfRows
=
0
;
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
mndCreateQnodeList
(
SMnode
*
pMnode
,
SArray
**
pList
,
int32_t
limit
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
SQnodeObj
*
pObj
=
NULL
;
SQnodeListReq
qlistReq
=
{
0
};
SQnodeListRsp
qlistRsp
=
{
0
};
if
(
tDeserializeSQnodeListReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
qlistReq
)
!=
0
)
{
mError
(
"failed to parse qnode list req"
);
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
int32_t
numOfRows
=
0
;
qlistRsp
.
addrsList
=
taosArrayInit
(
5
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
q
listRsp
.
addrs
List
)
{
SArray
*
qnodeList
=
taosArrayInit
(
5
,
sizeof
(
SQueryNodeLoad
));
if
(
NULL
==
q
node
List
)
{
mError
(
"failed to alloc epSet while process qnode list req"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_OVER
;
return
terrno
;
}
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_QNODE
,
pIter
,
(
void
**
)
&
pObj
);
if
(
pIter
==
NULL
)
break
;
SQueryNodeAddr
nodeAddr
=
{
0
};
nodeAddr
.
nodeId
=
QNODE_HANDLE
;
nodeAddr
.
epSet
.
numOfEps
=
1
;
tstrncpy
(
nodeAddr
.
epSet
.
eps
[
0
].
fqdn
,
pObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
nodeAddr
.
epSet
.
eps
[
0
].
port
=
pObj
->
pDnode
->
port
;
SQueryNodeLoad
nodeLoad
=
{
0
};
nodeLoad
.
addr
.
nodeId
=
QNODE_HANDLE
;
nodeLoad
.
addr
.
epSet
.
numOfEps
=
1
;
tstrncpy
(
nodeLoad
.
addr
.
epSet
.
eps
[
0
].
fqdn
,
pObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
nodeLoad
.
addr
.
epSet
.
eps
[
0
].
port
=
pObj
->
pDnode
->
port
;
nodeLoad
.
load
=
QNODE_LOAD_VALUE
(
pObj
);
(
void
)
taosArrayPush
(
q
listRsp
.
addrsList
,
&
nodeAddr
);
(
void
)
taosArrayPush
(
q
nodeList
,
&
nodeLoad
);
numOfRows
++
;
sdbRelease
(
pSdb
,
pObj
);
if
(
qlistReq
.
rowNum
>
0
&&
numOfRows
>=
qlistReq
.
rowNum
)
{
if
(
limit
>
0
&&
numOfRows
>=
limit
)
{
break
;
}
}
*
pList
=
qnodeList
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mndProcessQnodeListReq
(
SRpcMsg
*
pReq
)
{
int32_t
code
=
-
1
;
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SQnodeListReq
qlistReq
=
{
0
};
SQnodeListRsp
qlistRsp
=
{
0
};
if
(
tDeserializeSQnodeListReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
qlistReq
)
!=
0
)
{
mError
(
"failed to parse qnode list req"
);
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
if
(
mndCreateQnodeList
(
pMnode
,
&
qlistRsp
.
qnodeList
,
qlistReq
.
rowNum
)
!=
0
)
{
goto
_OVER
;
}
int32_t
rspLen
=
tSerializeSQnodeListRsp
(
NULL
,
0
,
&
qlistRsp
);
void
*
pRsp
=
rpcMallocCont
(
rspLen
);
if
(
pRsp
==
NULL
)
{
...
...
source/dnode/qnode/src/qnode.c
浏览文件 @
496ae4ca
...
...
@@ -41,12 +41,24 @@ void qndClose(SQnode *pQnode) {
}
int32_t
qndGetLoad
(
SQnode
*
pQnode
,
SQnodeLoad
*
pLoad
)
{
SMsgCb
*
pCb
=
&
pQnode
->
msgCb
;
SReadHandle
handle
=
{.
pMsgCb
=
&
pQnode
->
msgCb
};
SQWorkerStat
stat
=
{
0
};
int32_t
code
=
qWorkerGetStat
(
&
handle
,
pQnode
->
pQuery
,
&
stat
);
if
(
code
)
{
return
code
;
}
pLoad
->
numOfQueryInQueue
=
pCb
->
qsizeFp
(
pCb
->
mgmt
,
pQnode
->
qndId
,
QUERY_QUEUE
);
pLoad
->
numOfFetchInQueue
=
pCb
->
qsizeFp
(
pCb
->
mgmt
,
pQnode
->
qndId
,
FETCH_QUEUE
);
pLoad
->
waitTimeInQueryQUeue
=
qWorkerGetWaitTimeInQueue
(
pQnode
->
pQuery
,
QUERY_QUEUE
);
pLoad
->
waitTimeInFetchQUeue
=
qWorkerGetWaitTimeInQueue
(
pQnode
->
pQuery
,
FETCH_QUEUE
);
pLoad
->
numOfQueryInQueue
=
stat
.
numOfQueryInQueue
;
pLoad
->
numOfFetchInQueue
=
stat
.
numOfFetchInQueue
;
pLoad
->
timeInQueryQueue
=
stat
.
timeInQueryQueue
;
pLoad
->
timeInFetchQueue
=
stat
.
timeInFetchQueue
;
pLoad
->
cacheDataSize
=
stat
.
cacheDataSize
;
pLoad
->
numOfProcessedQuery
=
stat
.
queryProcessed
;
pLoad
->
numOfProcessedCQuery
=
stat
.
cqueryProcessed
;
pLoad
->
numOfProcessedFetch
=
stat
.
fetchProcessed
;
pLoad
->
numOfProcessedDrop
=
stat
.
dropProcessed
;
pLoad
->
numOfProcessedHb
=
stat
.
hbProcessed
;
return
0
;
}
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
496ae4ca
...
...
@@ -342,16 +342,16 @@ typedef struct SCtgOperation {
ctgOpFunc
func
;
}
SCtgOperation
;
#define CTG_QUEUE_
ADD
() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_
SUB
() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_
INC
() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_
DEC
() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_STAT_
ADD
(_item, _n) atomic_add_fetch_64(&(_item), _n)
#define CTG_STAT_
SUB
(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define CTG_STAT_
INC
(_item, _n) atomic_add_fetch_64(&(_item), _n)
#define CTG_STAT_
DEC
(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
#define CTG_R
UNTIME_STAT_ADD(item, n) (CTG_STAT_ADD
(gCtgMgmt.stat.runtime.item, n))
#define CTG_CACHE_STAT_
ADD(item, n) (CTG_STAT_ADD
(gCtgMgmt.stat.cache.item, n))
#define CTG_CACHE_STAT_
SUB(item, n) (CTG_STAT_SUB
(gCtgMgmt.stat.cache.item, n))
#define CTG_R
T_STAT_INC(item, n) (CTG_STAT_INC
(gCtgMgmt.stat.runtime.item, n))
#define CTG_CACHE_STAT_
INC(item, n) (CTG_STAT_INC
(gCtgMgmt.stat.cache.item, n))
#define CTG_CACHE_STAT_
DEC(item, n) (CTG_STAT_DEC
(gCtgMgmt.stat.cache.item, n))
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
496ae4ca
...
...
@@ -558,7 +558,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
*
catalogHandle
=
clusterCtg
;
CTG_CACHE_STAT_
ADD
(
clusterNum
,
1
);
CTG_CACHE_STAT_
INC
(
clusterNum
,
1
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -579,7 +579,7 @@ void catalogFreeHandle(SCatalog* pCtg) {
return
;
}
CTG_CACHE_STAT_
SUB
(
clusterNum
,
1
);
CTG_CACHE_STAT_
DEC
(
clusterNum
,
1
);
uint64_t
clusterId
=
pCtg
->
clusterId
;
...
...
@@ -990,7 +990,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
}
if
(
pReq
->
qNodeRequired
)
{
pRsp
->
pQnodeList
=
taosArrayInit
(
10
,
sizeof
(
SQueryNode
Addr
));
pRsp
->
pQnodeList
=
taosArrayInit
(
10
,
sizeof
(
SQueryNode
Load
));
CTG_ERR_JRET
(
ctgGetQnodeListFromMnode
(
CTG_PARAMS_LIST
(),
pRsp
->
pQnodeList
,
NULL
));
}
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
496ae4ca
...
...
@@ -178,7 +178,7 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac
*
pCache
=
dbCache
;
CTG_CACHE_STAT_
ADD
(
vgHitNum
,
1
);
CTG_CACHE_STAT_
INC
(
vgHitNum
,
1
);
ctgDebug
(
"Got db vgInfo from cache, dbFName:%s"
,
dbFName
);
...
...
@@ -192,7 +192,7 @@ _return:
*
pCache
=
NULL
;
CTG_CACHE_STAT_
ADD
(
vgMissNum
,
1
);
CTG_CACHE_STAT_
INC
(
vgMissNum
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -279,7 +279,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgDebug
(
"Got meta from cache, type:%d, dbFName:%s, tbName:%s"
,
tbMeta
->
tableType
,
dbFName
,
ctx
->
pName
->
tname
);
CTG_CACHE_STAT_
ADD
(
tblHitNum
,
1
);
CTG_CACHE_STAT_
INC
(
tblHitNum
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -312,7 +312,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
ctgReleaseDBCache
(
pCtg
,
dbCache
);
CTG_CACHE_STAT_
ADD
(
tblHitNum
,
1
);
CTG_CACHE_STAT_
INC
(
tblHitNum
,
1
);
ctgDebug
(
"Got tbmeta from cache, dbFName:%s, tbName:%s"
,
dbFName
,
ctx
->
pName
->
tname
);
...
...
@@ -323,7 +323,7 @@ _return:
ctgReleaseDBCache
(
pCtg
,
dbCache
);
taosMemoryFreeClear
(
*
pTableMeta
);
CTG_CACHE_STAT_
ADD
(
tblMissNum
,
1
);
CTG_CACHE_STAT_
INC
(
tblMissNum
,
1
);
CTG_RET
(
code
);
}
...
...
@@ -462,7 +462,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFNam
*
inCache
=
true
;
ctgDebug
(
"Got user from cache, user:%s"
,
user
);
CTG_CACHE_STAT_
ADD
(
userHitNum
,
1
);
CTG_CACHE_STAT_
INC
(
userHitNum
,
1
);
if
(
pUser
->
superUser
)
{
*
pass
=
true
;
...
...
@@ -491,7 +491,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFNam
_return:
*
inCache
=
false
;
CTG_CACHE_STAT_
ADD
(
userMissNum
,
1
);
CTG_CACHE_STAT_
INC
(
userMissNum
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -521,7 +521,7 @@ void ctgDequeue(SCtgCacheOperation **op) {
SCtgQNode
*
node
=
gCtgMgmt
.
queue
.
head
->
next
;
gCtgMgmt
.
queue
.
head
=
gCtgMgmt
.
queue
.
head
->
next
;
CTG_QUEUE_
SUB
();
CTG_QUEUE_
DEC
();
taosMemoryFreeClear
(
orig
);
...
...
@@ -545,8 +545,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
gCtgMgmt
.
queue
.
tail
=
node
;
CTG_UNLOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
queue
.
qlock
);
CTG_QUEUE_
ADD
();
CTG_R
UNTIME_STAT_ADD
(
qNum
,
1
);
CTG_QUEUE_
INC
();
CTG_R
T_STAT_INC
(
qNum
,
1
);
tsem_post
(
&
gCtgMgmt
.
queue
.
reqSem
);
...
...
@@ -988,7 +988,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_CACHE_STAT_
ADD
(
dbNum
,
1
);
CTG_CACHE_STAT_
INC
(
dbNum
,
1
);
SDbVgVersion
vgVersion
=
{.
dbId
=
newDBCache
.
dbId
,
.
vgVersion
=
-
1
};
strncpy
(
vgVersion
.
dbFName
,
dbFName
,
sizeof
(
vgVersion
.
dbFName
));
...
...
@@ -1048,7 +1048,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
CTG_ERR_RET
(
TSDB_CODE_CTG_DB_DROPPED
);
}
CTG_CACHE_STAT_
SUB
(
dbNum
,
1
);
CTG_CACHE_STAT_
DEC
(
dbNum
,
1
);
ctgInfo
(
"db removed from cache, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbId
);
...
...
@@ -1187,7 +1187,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
if
(
taosHashRemove
(
tbCache
->
stbCache
,
&
orig
->
suid
,
sizeof
(
orig
->
suid
)))
{
ctgError
(
"stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
}
else
{
CTG_CACHE_STAT_
SUB
(
stblNum
,
1
);
CTG_CACHE_STAT_
DEC
(
stblNum
,
1
);
}
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
...
...
@@ -1214,7 +1214,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
}
if
(
NULL
==
orig
)
{
CTG_CACHE_STAT_
ADD
(
tblNum
,
1
);
CTG_CACHE_STAT_
INC
(
tblNum
,
1
);
}
ctgDebug
(
"tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d"
,
dbFName
,
tbName
,
meta
->
tableType
);
...
...
@@ -1233,7 +1233,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_CACHE_STAT_
ADD
(
stblNum
,
1
);
CTG_CACHE_STAT_
INC
(
stblNum
,
1
);
CTG_UNLOCK
(
CTG_WRITE
,
&
tbCache
->
stbLock
);
...
...
@@ -1371,14 +1371,14 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
if
(
taosHashRemove
(
dbCache
->
tbCache
.
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
)))
{
ctgDebug
(
"stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
else
{
CTG_CACHE_STAT_
SUB
(
stblNum
,
1
);
CTG_CACHE_STAT_
DEC
(
stblNum
,
1
);
}
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
taosHashRemove
(
dbCache
->
tbCache
.
metaCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
)))
{
ctgError
(
"stb not exist in cache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
}
else
{
CTG_CACHE_STAT_
SUB
(
tblNum
,
1
);
CTG_CACHE_STAT_
DEC
(
tblNum
,
1
);
}
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
...
...
@@ -1419,7 +1419,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
ctgError
(
"stb not exist in cache, dbFName:%s, tbName:%s"
,
msg
->
dbFName
,
msg
->
tbName
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
else
{
CTG_CACHE_STAT_
SUB
(
tblNum
,
1
);
CTG_CACHE_STAT_
DEC
(
tblNum
,
1
);
}
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
...
...
@@ -1578,7 +1578,7 @@ void* ctgUpdateThreadFunc(void* param) {
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
}
CTG_R
UNTIME_STAT_ADD
(
qDoneNum
,
1
);
CTG_R
T_STAT_INC
(
qDoneNum
,
1
);
ctgdShowClusterCache
(
pCtg
);
}
...
...
source/libs/catalog/src/ctgRemote.c
浏览文件 @
496ae4ca
...
...
@@ -275,7 +275,7 @@ int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) {
}
if
(
pTask
)
{
void
*
pOut
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
void
*
pOut
=
taosArrayInit
(
4
,
sizeof
(
SQueryNodeLoad
));
if
(
NULL
==
pOut
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
496ae4ca
...
...
@@ -85,7 +85,7 @@ void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) {
int32_t
stblNum
=
taosHashGetSize
(
cache
->
stbCache
);
taosHashCleanup
(
cache
->
stbCache
);
cache
->
stbCache
=
NULL
;
CTG_CACHE_STAT_
SUB
(
stblNum
,
stblNum
);
CTG_CACHE_STAT_
DEC
(
stblNum
,
stblNum
);
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
...
...
@@ -94,7 +94,7 @@ void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) {
int32_t
tblNum
=
taosHashGetSize
(
cache
->
metaCache
);
taosHashCleanup
(
cache
->
metaCache
);
cache
->
metaCache
=
NULL
;
CTG_CACHE_STAT_
SUB
(
tblNum
,
tblNum
);
CTG_CACHE_STAT_
DEC
(
tblNum
,
tblNum
);
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
}
...
...
@@ -145,7 +145,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
taosHashCleanup
(
pCtg
->
dbCache
);
CTG_CACHE_STAT_
SUB
(
dbNum
,
dbNum
);
CTG_CACHE_STAT_
DEC
(
dbNum
,
dbNum
);
}
if
(
pCtg
->
userCache
)
{
...
...
@@ -162,7 +162,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
taosHashCleanup
(
pCtg
->
userCache
);
CTG_CACHE_STAT_
SUB
(
userNum
,
userNum
);
CTG_CACHE_STAT_
DEC
(
userNum
,
userNum
);
}
taosMemoryFree
(
pCtg
);
...
...
source/libs/executor/inc/dataSinkInt.h
浏览文件 @
496ae4ca
...
...
@@ -37,6 +37,7 @@ typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
typedef
void
(
*
FGetDataLength
)(
struct
SDataSinkHandle
*
pHandle
,
int32_t
*
pLen
,
bool
*
pQueryEnd
);
typedef
int32_t
(
*
FGetDataBlock
)(
struct
SDataSinkHandle
*
pHandle
,
SOutputData
*
pOutput
);
typedef
int32_t
(
*
FDestroyDataSinker
)(
struct
SDataSinkHandle
*
pHandle
);
typedef
int32_t
(
*
FGetCacheSize
)(
struct
SDataSinkHandle
*
pHandle
,
uint64_t
*
size
);
typedef
struct
SDataSinkHandle
{
FPutDataBlock
fPut
;
...
...
@@ -44,6 +45,7 @@ typedef struct SDataSinkHandle {
FGetDataLength
fGetLen
;
FGetDataBlock
fGetData
;
FDestroyDataSinker
fDestroy
;
FGetCacheSize
fGetCacheSize
;
}
SDataSinkHandle
;
int32_t
createDataDispatcher
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
);
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
496ae4ca
...
...
@@ -22,6 +22,8 @@
#include "tglobal.h"
#include "tqueue.h"
extern
SDataSinkStat
gDataSinkStat
;
typedef
struct
SDataDispatchBuf
{
int32_t
useSize
;
int32_t
allocSize
;
...
...
@@ -45,6 +47,7 @@ typedef struct SDataDispatchHandle {
int32_t
status
;
bool
queryEnd
;
uint64_t
useconds
;
uint64_t
cachedSize
;
TdThreadMutex
mutex
;
}
SDataDispatchHandle
;
...
...
@@ -71,7 +74,7 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
static
void
toDataCacheEntry
(
const
SDataDispatchHandle
*
pHandle
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
static
void
toDataCacheEntry
(
SDataDispatchHandle
*
pHandle
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pHandle
->
pSchema
->
pSlots
);
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)
pBuf
->
pData
;
...
...
@@ -84,6 +87,9 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
blockCompressEncode
(
pInput
->
pData
,
pEntry
->
data
,
&
pEntry
->
dataLen
,
numOfCols
,
pEntry
->
compressed
);
pBuf
->
useSize
+=
pEntry
->
dataLen
;
atomic_add_fetch_64
(
&
pHandle
->
cachedSize
,
pEntry
->
dataLen
);
atomic_add_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
}
static
bool
allocBuf
(
SDataDispatchHandle
*
pDispatcher
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
...
...
@@ -156,6 +162,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
taosFreeQitem
(
pBuf
);
*
pLen
=
((
SDataCacheEntry
*
)(
pDispatcher
->
nextOutput
.
pData
))
->
dataLen
;
*
pQueryEnd
=
pDispatcher
->
queryEnd
;
qDebug
(
"got data len %d, row num %d in sink"
,
*
pLen
,
((
SDataCacheEntry
*
)(
pDispatcher
->
nextOutput
.
pData
))
->
numOfRows
);
}
static
int32_t
getDataBlock
(
SDataSinkHandle
*
pHandle
,
SOutputData
*
pOutput
)
{
...
...
@@ -173,6 +180,10 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput
->
numOfRows
=
pEntry
->
numOfRows
;
pOutput
->
numOfCols
=
pEntry
->
numOfCols
;
pOutput
->
compressed
=
pEntry
->
compressed
;
atomic_sub_fetch_64
(
&
pDispatcher
->
cachedSize
,
pEntry
->
dataLen
);
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
taosMemoryFreeClear
(
pDispatcher
->
nextOutput
.
pData
);
// todo persistent
pOutput
->
bufStatus
=
updateStatus
(
pDispatcher
);
taosThreadMutexLock
(
&
pDispatcher
->
mutex
);
...
...
@@ -180,11 +191,14 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput
->
useconds
=
pDispatcher
->
useconds
;
pOutput
->
precision
=
pDispatcher
->
pSchema
->
precision
;
taosThreadMutexUnlock
(
&
pDispatcher
->
mutex
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
destroyDataSinker
(
SDataSinkHandle
*
pHandle
)
{
SDataDispatchHandle
*
pDispatcher
=
(
SDataDispatchHandle
*
)
pHandle
;
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pDispatcher
->
cachedSize
);
taosMemoryFreeClear
(
pDispatcher
->
nextOutput
.
pData
);
while
(
!
taosQueueEmpty
(
pDispatcher
->
pDataBlocks
))
{
SDataDispatchBuf
*
pBuf
=
NULL
;
...
...
@@ -197,6 +211,13 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
getCacheSize
(
struct
SDataSinkHandle
*
pHandle
,
uint64_t
*
size
)
{
SDataDispatchHandle
*
pDispatcher
=
(
SDataDispatchHandle
*
)
pHandle
;
*
size
=
atomic_load_64
(
&
pDispatcher
->
cachedSize
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
createDataDispatcher
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
)
{
SDataDispatchHandle
*
dispatcher
=
taosMemoryCalloc
(
1
,
sizeof
(
SDataDispatchHandle
));
if
(
NULL
==
dispatcher
)
{
...
...
@@ -208,6 +229,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
dispatcher
->
sink
.
fGetLen
=
getDataLength
;
dispatcher
->
sink
.
fGetData
=
getDataBlock
;
dispatcher
->
sink
.
fDestroy
=
destroyDataSinker
;
dispatcher
->
sink
.
fGetCacheSize
=
getCacheSize
;
dispatcher
->
pManager
=
pManager
;
dispatcher
->
pSchema
=
pDataSink
->
pInputDataBlockDesc
;
dispatcher
->
status
=
DS_BUF_EMPTY
;
...
...
source/libs/executor/src/dataSinkMgt.c
浏览文件 @
496ae4ca
...
...
@@ -19,6 +19,7 @@
#include "planner.h"
static
SDataSinkManager
gDataSinkManager
=
{
0
};
SDataSinkStat
gDataSinkStat
=
{
0
};
int32_t
dsDataSinkMgtInit
(
SDataSinkMgtCfg
*
cfg
)
{
gDataSinkManager
.
cfg
=
*
cfg
;
...
...
@@ -26,6 +27,13 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
return
0
;
// to avoid compiler eror
}
int32_t
dsDataSinkGetCacheSize
(
SDataSinkStat
*
pStat
)
{
pStat
->
cachedSize
=
atomic_load_64
(
&
gDataSinkStat
.
cachedSize
);
return
0
;
}
int32_t
dsCreateDataSinker
(
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
)
{
if
(
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
==
nodeType
(
pDataSink
))
{
return
createDataDispatcher
(
&
gDataSinkManager
,
pDataSink
,
pHandle
);
...
...
@@ -53,6 +61,12 @@ int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
return
pHandleImpl
->
fGetData
(
pHandleImpl
,
pOutput
);
}
int32_t
dsGetCacheSize
(
DataSinkHandle
handle
,
uint64_t
*
pSize
)
{
SDataSinkHandle
*
pHandleImpl
=
(
SDataSinkHandle
*
)
handle
;
return
pHandleImpl
->
fGetCacheSize
(
pHandleImpl
,
pSize
);
}
void
dsScheduleProcess
(
void
*
ahandle
,
void
*
pItem
)
{
// todo
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
496ae4ca
...
...
@@ -303,7 +303,9 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
int32_t
dstSlotId
=
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
dstSlotId
);
colInfoDataEnsureCapacity
(
pColInfoData
,
0
,
pBlock
->
info
.
rows
);
colInfoDataCleanup
(
pColInfoData
,
pBlock
->
info
.
rows
);
int32_t
functionId
=
pExpr
->
pExpr
->
_function
.
functionId
;
...
...
source/libs/monitor/src/monMsg.c
浏览文件 @
496ae4ca
...
...
@@ -556,4 +556,50 @@ int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInf
tDecoderClear
(
&
decoder
);
return
0
;
}
\ No newline at end of file
}
int32_t
tSerializeSQnodeLoad
(
void
*
buf
,
int32_t
bufLen
,
SQnodeLoad
*
pInfo
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
numOfProcessedQuery
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
numOfProcessedCQuery
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
numOfProcessedFetch
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
numOfProcessedDrop
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
numOfProcessedHb
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
cacheDataSize
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
numOfQueryInQueue
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
numOfFetchInQueue
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
timeInQueryQueue
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pInfo
->
timeInFetchQueue
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSQnodeLoad
(
void
*
buf
,
int32_t
bufLen
,
SQnodeLoad
*
pInfo
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
numOfProcessedQuery
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
numOfProcessedCQuery
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
numOfProcessedFetch
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
numOfProcessedDrop
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
numOfProcessedHb
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
cacheDataSize
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
numOfQueryInQueue
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
numOfFetchInQueue
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
timeInQueryQueue
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pInfo
->
timeInFetchQueue
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
496ae4ca
...
...
@@ -468,6 +468,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
return
TSDB_CODE_OUT_OF_MEMORY
;
}
vgroupInfoToNodeAddr
(
pScanLogicNode
->
pVgroupList
->
vgroups
,
&
pSubplan
->
execNode
);
SQueryNodeLoad
node
=
{.
addr
=
pSubplan
->
execNode
,
.
load
=
0
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
pSubplan
->
execNode
);
return
createScanPhysiNodeFinalize
(
pCxt
,
pSubplan
,
pScanLogicNode
,
(
SScanPhysiNode
*
)
pTagScan
,
pPhyNode
);
}
...
...
@@ -489,7 +490,8 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pSubplan
->
execNodeStat
.
tableNum
=
pScanLogicNode
->
pVgroupList
->
vgroups
[
0
].
numOfTable
;
}
if
(
pCxt
->
pExecNodeList
)
{
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
pSubplan
->
execNode
);
SQueryNodeLoad
node
=
{.
addr
=
pSubplan
->
execNode
,
.
load
=
0
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
node
);
}
tNameGetFullDbName
(
&
pScanLogicNode
->
tableName
,
pSubplan
->
dbFName
);
pTableScan
->
dataRequired
=
pScanLogicNode
->
dataRequired
;
...
...
@@ -520,10 +522,11 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
pScan
->
accountId
=
pCxt
->
pPlanCxt
->
acctId
;
if
(
0
==
strcmp
(
pScanLogicNode
->
tableName
.
tname
,
TSDB_INS_TABLE_USER_TABLES
))
{
vgroupInfoToNodeAddr
(
pScanLogicNode
->
pVgroupList
->
vgroups
,
&
pSubplan
->
execNode
);
SQueryNodeLoad
node
=
{
.
addr
=
pSubplan
->
execNode
,
.
load
=
0
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
pSubplan
->
execNode
);
}
else
{
SQueryNode
Addr
addr
=
{.
nodeId
=
MNODE_HANDLE
,
.
epSet
=
pCxt
->
pPlanCxt
->
mgmtEpSet
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
addr
);
SQueryNode
Load
node
=
{
.
addr
=
{.
nodeId
=
MNODE_HANDLE
,
.
epSet
=
pCxt
->
pPlanCxt
->
mgmtEpSet
},
.
load
=
0
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
node
);
}
pScan
->
mgmtEpSet
=
pCxt
->
pPlanCxt
->
mgmtEpSet
;
tNameGetFullDbName
(
&
pScanLogicNode
->
tableName
,
pSubplan
->
dbFName
);
...
...
@@ -1243,7 +1246,8 @@ static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogic
SVnodeModifLogicNode
*
pModif
=
(
SVnodeModifLogicNode
*
)
pLogicSubplan
->
pNode
;
pSubplan
->
msgType
=
pModif
->
msgType
;
pSubplan
->
execNode
.
epSet
=
pModif
->
pVgDataBlocks
->
vg
.
epSet
;
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
pSubplan
->
execNode
);
SQueryNodeLoad
node
=
{.
addr
=
pSubplan
->
execNode
,
.
load
=
0
};
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
node
);
code
=
createDataInserter
(
pCxt
,
pModif
->
pVgDataBlocks
,
&
pSubplan
->
pDataSink
);
}
else
{
pSubplan
->
msgType
=
TDMT_VND_QUERY
;
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
496ae4ca
...
...
@@ -373,7 +373,7 @@ int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
return
code
;
}
out
.
addrs
List
=
(
SArray
*
)
output
;
out
.
qnode
List
=
(
SArray
*
)
output
;
if
(
tDeserializeSQnodeListRsp
(
msg
,
msgSize
,
&
out
)
!=
0
)
{
qError
(
"invalid qnode list rsp msg, msgSize:%d"
,
msgSize
);
code
=
TSDB_CODE_INVALID_MSG
;
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
496ae4ca
...
...
@@ -145,13 +145,30 @@ typedef struct SQWSchStatus {
SHashObj
*
tasksHash
;
// key:queryId+taskId, value: SQWTaskStatus
}
SQWSchStatus
;
typedef
struct
SQW
WaitTimeStat
{
typedef
struct
SQW
TimeInQ
{
uint64_t
num
;
uint64_t
total
;
}
SQWWaitTimeStat
;
}
SQWTimeInQ
;
typedef
struct
SQWMsgStat
{
SQWTimeInQ
waitTime
[
2
];
uint64_t
queryProcessed
;
uint64_t
cqueryProcessed
;
uint64_t
fetchProcessed
;
uint64_t
fetchRspProcessed
;
uint64_t
cancelProcessed
;
uint64_t
dropProcessed
;
uint64_t
hbProcessed
;
}
SQWMsgStat
;
typedef
struct
SQWRTStat
{
uint64_t
startTaskNum
;
uint64_t
stopTaskNum
;
}
SQWRTStat
;
typedef
struct
SQWStat
{
SQWWaitTimeStat
msgWait
[
2
];
SQWMsgStat
msgStat
;
SQWRTStat
rtStat
;
}
SQWStat
;
// Qnode/Vnode level task management
...
...
@@ -182,10 +199,13 @@ typedef struct SQWorkerMgmt {
#define QW_IDS() sId, qId, tId, rId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event])
#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
#define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
#define QW_STAT_GET(_item) atomic_load_64(&(_item))
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event])
#define QW_IS_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
...
...
@@ -332,8 +352,8 @@ int32_t qwDropTask(QW_FPARAMS_DEF);
void
qwSaveTbVersionInfo
(
qTaskInfo_t
pTaskInfo
,
SQWTaskCtx
*
ctx
);
int32_t
qwOpenRef
(
void
);
void
qwSetHbParam
(
int64_t
refId
,
SQWHbParam
**
pParam
);
int32_t
qwUpdate
Wait
TimeInQueue
(
SQWorker
*
mgmt
,
int64_t
ts
,
EQueueType
type
);
int64_t
qwGet
Wait
TimeInQueue
(
SQWorker
*
mgmt
,
EQueueType
type
);
int32_t
qwUpdateTimeInQueue
(
SQWorker
*
mgmt
,
int64_t
ts
,
EQueueType
type
);
int64_t
qwGetTimeInQueue
(
SQWorker
*
mgmt
,
EQueueType
type
);
void
qwDbgDumpMgmtInfo
(
SQWorker
*
mgmt
);
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
);
...
...
source/libs/qworker/src/qwMsg.c
浏览文件 @
496ae4ca
...
...
@@ -257,7 +257,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateWaitTimeInQueue
(
mgmt
,
ts
,
QUERY_QUEUE
);
qwUpdateTimeInQueue
(
mgmt
,
ts
,
QUERY_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
queryProcessed
,
1
);
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -297,7 +298,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
SQWTaskCtx
*
handles
=
NULL
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateWaitTimeInQueue
(
mgmt
,
ts
,
QUERY_QUEUE
);
qwUpdateTimeInQueue
(
mgmt
,
ts
,
QUERY_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
cqueryProcessed
,
1
);
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid cquery msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -328,7 +330,8 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
SResFetchReq
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateWaitTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
fetchProcessed
,
1
);
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid fetch msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -357,7 +360,10 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int32_t
qWorkerProcessFetchRsp
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
int64_t
ts
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateWaitTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
if
(
mgmt
)
{
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
fetchRspProcessed
,
1
);
}
qProcessFetchRsp
(
NULL
,
pMsg
,
NULL
);
pMsg
->
pCont
=
NULL
;
...
...
@@ -373,7 +379,8 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
int32_t
code
=
0
;
STaskCancelReq
*
msg
=
pMsg
->
pCont
;
qwUpdateWaitTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
cancelProcessed
,
1
);
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task cancel msg"
);
...
...
@@ -411,7 +418,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
STaskDropReq
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateWaitTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
dropProcessed
,
1
);
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task drop msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -452,7 +460,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
SSchedulerHbReq
req
=
{
0
};
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
qwUpdateWaitTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
qwUpdateTimeInQueue
(
mgmt
,
ts
,
FETCH_QUEUE
);
QW_STAT_INC
(
mgmt
->
stat
.
msgStat
.
hbProcessed
,
1
);
if
(
NULL
==
pMsg
->
pCont
)
{
QW_ELOG
(
"invalid hb msg, msg:%p, msgLen:%d"
,
pMsg
->
pCont
,
pMsg
->
contLen
);
...
...
source/libs/qworker/src/qwUtil.c
浏览文件 @
496ae4ca
...
...
@@ -499,7 +499,7 @@ int32_t qwOpenRef(void) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwUpdate
Wait
TimeInQueue
(
SQWorker
*
mgmt
,
int64_t
ts
,
EQueueType
type
)
{
int32_t
qwUpdateTimeInQueue
(
SQWorker
*
mgmt
,
int64_t
ts
,
EQueueType
type
)
{
if
(
ts
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -507,12 +507,12 @@ int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) {
int64_t
duration
=
taosGetTimestampUs
()
-
ts
;
switch
(
type
)
{
case
QUERY_QUEUE
:
++
mgmt
->
stat
.
msg
Wait
[
0
].
num
;
mgmt
->
stat
.
msg
Wait
[
0
].
total
+=
duration
;
++
mgmt
->
stat
.
msg
Stat
.
waitTime
[
0
].
num
;
mgmt
->
stat
.
msg
Stat
.
waitTime
[
0
].
total
+=
duration
;
break
;
case
FETCH_QUEUE
:
++
mgmt
->
stat
.
msg
Wait
[
1
].
num
;
mgmt
->
stat
.
msg
Wait
[
1
].
total
+=
duration
;
++
mgmt
->
stat
.
msg
Stat
.
waitTime
[
1
].
num
;
mgmt
->
stat
.
msg
Stat
.
waitTime
[
1
].
total
+=
duration
;
break
;
default:
qError
(
"unsupported queue type %d"
,
type
);
...
...
@@ -522,19 +522,20 @@ int32_t qwUpdateWaitTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) {
return
TSDB_CODE_SUCCESS
;
}
int64_t
qwGet
Wait
TimeInQueue
(
SQWorker
*
mgmt
,
EQueueType
type
)
{
SQW
WaitTimeStat
*
pStat
=
NULL
;
int64_t
qwGetTimeInQueue
(
SQWorker
*
mgmt
,
EQueueType
type
)
{
SQW
TimeInQ
*
pStat
=
NULL
;
switch
(
type
)
{
case
QUERY_QUEUE
:
pStat
=
&
mgmt
->
stat
.
msg
Wait
[
0
];
pStat
=
&
mgmt
->
stat
.
msg
Stat
.
waitTime
[
0
];
return
pStat
->
num
?
(
pStat
->
total
/
pStat
->
num
)
:
0
;
case
FETCH_QUEUE
:
pStat
=
&
mgmt
->
stat
.
msg
Wait
[
1
];
pStat
=
&
mgmt
->
stat
.
msg
Stat
.
waitTime
[
1
];
return
pStat
->
num
?
(
pStat
->
total
/
pStat
->
num
)
:
0
;
default:
qError
(
"unsupported queue type %d"
,
type
);
return
-
1
;
}
return
-
1
;
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
496ae4ca
#include "qworker.h"
#include "dataSinkMgt.h"
#include "executor.h"
#include "planner.h"
...
...
@@ -8,6 +7,7 @@
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"
#include "qworker.h"
SQWorkerMgmt
gQwMgmt
=
{
.
lock
=
0
,
...
...
@@ -950,8 +950,29 @@ void qWorkerDestroy(void **qWorkerMgmt) {
}
}
int64_t
qWorkerGetWaitTimeInQueue
(
void
*
qWorkerMgmt
,
EQueueType
type
)
{
return
qwGetWaitTimeInQueue
((
SQWorker
*
)
qWorkerMgmt
,
type
);
int32_t
qWorkerGetStat
(
SReadHandle
*
handle
,
void
*
qWorkerMgmt
,
SQWorkerStat
*
pStat
)
{
if
(
NULL
==
handle
||
NULL
==
qWorkerMgmt
||
NULL
==
pStat
)
{
QW_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SDataSinkStat
sinkStat
=
{
0
};
dsDataSinkGetCacheSize
(
&
sinkStat
);
pStat
->
cacheDataSize
=
sinkStat
.
cachedSize
;
pStat
->
queryProcessed
=
QW_STAT_GET
(
mgmt
->
stat
.
msgStat
.
queryProcessed
);
pStat
->
cqueryProcessed
=
QW_STAT_GET
(
mgmt
->
stat
.
msgStat
.
cqueryProcessed
);
pStat
->
fetchProcessed
=
QW_STAT_GET
(
mgmt
->
stat
.
msgStat
.
fetchProcessed
);
pStat
->
dropProcessed
=
QW_STAT_GET
(
mgmt
->
stat
.
msgStat
.
dropProcessed
);
pStat
->
hbProcessed
=
QW_STAT_GET
(
mgmt
->
stat
.
msgStat
.
hbProcessed
);
pStat
->
numOfQueryInQueue
=
handle
->
pMsgCb
->
qsizeFp
(
handle
->
pMsgCb
->
mgmt
,
mgmt
->
nodeId
,
QUERY_QUEUE
);
pStat
->
numOfFetchInQueue
=
handle
->
pMsgCb
->
qsizeFp
(
handle
->
pMsgCb
->
mgmt
,
mgmt
->
nodeId
,
FETCH_QUEUE
);
pStat
->
timeInQueryQueue
=
qwGetTimeInQueue
((
SQWorker
*
)
qWorkerMgmt
,
QUERY_QUEUE
);
pStat
->
timeInFetchQueue
=
qwGetTimeInQueue
((
SQWorker
*
)
qWorkerMgmt
,
FETCH_QUEUE
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
496ae4ca
...
...
@@ -469,6 +469,34 @@ _return:
SCH_RET
(
code
);
}
int32_t
schSetAddrsFromNodeList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
addNum
=
0
;
int32_t
nodeNum
=
0
;
if
(
pJob
->
nodeList
)
{
nodeNum
=
taosArrayGetSize
(
pJob
->
nodeList
);
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_CANDIDATE_EP_NUM
;
++
i
)
{
SQueryNodeAddr
*
naddr
=
taosArrayGet
(
pJob
->
nodeList
,
i
);
if
(
NULL
==
taosArrayPush
(
pTask
->
candidateAddrs
,
naddr
))
{
SCH_TASK_ELOG
(
"taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d"
,
addNum
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
++
addNum
;
}
}
if
(
addNum
<=
0
)
{
SCH_TASK_ELOG
(
"no available execNode as candidates, nodeNum:%d"
,
nodeNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schSetTaskCandidateAddrs
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
NULL
!=
pTask
->
candidateAddrs
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -492,27 +520,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
addNum
=
0
;
int32_t
nodeNum
=
0
;
if
(
pJob
->
nodeList
)
{
nodeNum
=
taosArrayGetSize
(
pJob
->
nodeList
);
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_CANDIDATE_EP_NUM
;
++
i
)
{
SQueryNodeAddr
*
naddr
=
taosArrayGet
(
pJob
->
nodeList
,
i
);
if
(
NULL
==
taosArrayPush
(
pTask
->
candidateAddrs
,
naddr
))
{
SCH_TASK_ELOG
(
"taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d"
,
addNum
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
++
addNum
;
}
}
if
(
addNum
<=
0
)
{
SCH_TASK_ELOG
(
"no available execNode as candidates, nodeNum:%d"
,
nodeNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_RET
(
schSetAddrsFromNodeList
(
pJob
,
pTask
));
/*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
496ae4ca
...
...
@@ -141,7 +141,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
if
(
pJob
->
status
<
JOB_TASK_STATUS_NOT_START
||
pJob
->
levelNum
<=
0
||
NULL
==
pJob
->
levels
)
{
qDebug
(
"job not initialized or not executable job, refId:%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
SCH_ERR_
J
RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
for
(
int32_t
i
=
pJob
->
levelNum
-
1
;
i
>=
0
;
--
i
)
{
...
...
@@ -155,7 +155,11 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
}
}
return
TSDB_CODE_SUCCESS
;
_return:
schReleaseJob
(
job
);
SCH_RET
(
code
);
}
int32_t
scheduleCancelJob
(
int64_t
job
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录