Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
34511150
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
34511150
编写于
5月 09, 2022
作者:
J
jiacy-jcy
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into test/jcy
上级
154f0c42
05d55b97
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
291 addition
and
262 deletion
+291
-262
cmake/cmake.define
cmake/cmake.define
+1
-1
source/dnode/mgmt/implement/src/dmHandle.c
source/dnode/mgmt/implement/src/dmHandle.c
+4
-0
source/dnode/mgmt/interface/inc/dmDef.h
source/dnode/mgmt/interface/inc/dmDef.h
+2
-0
source/dnode/mgmt/interface/src/dmEnv.c
source/dnode/mgmt/interface/src/dmEnv.c
+1
-0
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+10
-9
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
+16
-16
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+76
-52
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+16
-67
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+80
-90
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+1
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+7
-4
source/libs/function/CMakeLists.txt
source/libs/function/CMakeLists.txt
+2
-2
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+65
-14
source/os/src/osDir.c
source/os/src/osDir.c
+1
-1
source/os/src/osFile.c
source/os/src/osFile.c
+1
-1
source/os/src/osSysinfo.c
source/os/src/osSysinfo.c
+7
-3
未找到文件。
cmake/cmake.define
浏览文件 @
34511150
...
...
@@ -66,7 +66,7 @@ ENDIF ()
IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/W3 /D_WIN32")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
# ENDIF ()
...
...
source/dnode/mgmt/implement/src/dmHandle.c
浏览文件 @
34511150
...
...
@@ -241,7 +241,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
strncpy
(
path
,
tsProcPath
,
strlen
(
tsProcPath
));
taosDirName
(
path
);
}
#ifdef WINDOWS
strcat
(
path
,
"udfd.exe"
);
#else
strcat
(
path
,
"/udfd"
);
#endif
char
*
argsUdfd
[]
=
{
path
,
"-c"
,
configDir
,
NULL
};
options
.
args
=
argsUdfd
;
options
.
file
=
path
;
...
...
source/dnode/mgmt/interface/inc/dmDef.h
浏览文件 @
34511150
...
...
@@ -41,6 +41,8 @@
#include "monitor.h"
#include "sync.h"
#include "libs/function/function.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/dnode/mgmt/interface/src/dmEnv.c
浏览文件 @
34511150
...
...
@@ -55,6 +55,7 @@ void dmCleanup() {
monCleanup
();
syncCleanUp
();
walCleanUp
();
udfcClose
();
taosStopCacheRefreshWorker
();
dInfo
(
"dnode env is cleaned up"
);
}
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
浏览文件 @
34511150
...
...
@@ -29,15 +29,15 @@ typedef struct SVnodesMgmt {
SHashObj
*
hash
;
SRWLatch
latch
;
SVnodesStat
state
;
const
char
*
path
;
SDnode
*
pDnode
;
SMgmtWrapper
*
pWrapper
;
STfs
*
pTfs
;
SQWorkerPool
queryPool
;
SQWorkerPool
fetchPool
;
SWWorkerPool
syncPool
;
SWWorkerPool
writePool
;
SWWorkerPool
mergePool
;
const
char
*
path
;
SDnode
*
pDnode
;
SMgmtWrapper
*
pWrapper
;
SSingleWorker
mgmtWorker
;
SSingleWorker
monitorWorker
;
}
SVnodesMgmt
;
...
...
@@ -95,9 +95,9 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
void
vmGetVnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonVloadInfo
*
pInfo
);
// vmFile.c
int32_t
vmGetVnode
s
FromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
);
int32_t
vmWriteVnode
s
ToFile
(
SVnodesMgmt
*
pMgmt
);
SVnodeObj
**
vmGetVnode
s
FromHash
(
SVnodesMgmt
*
pMgmt
,
int32_t
*
numOfVnodes
);
int32_t
vmGetVnode
List
FromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
);
int32_t
vmWriteVnode
List
ToFile
(
SVnodesMgmt
*
pMgmt
);
SVnodeObj
**
vmGetVnode
List
FromHash
(
SVnodesMgmt
*
pMgmt
,
int32_t
*
numOfVnodes
);
// vmWorker.c
int32_t
vmStartWorker
(
SVnodesMgmt
*
pMgmt
);
...
...
@@ -105,11 +105,12 @@ void vmStopWorker(SVnodesMgmt *pMgmt);
int32_t
vmAllocQueue
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
);
void
vmFreeQueue
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
);
int32_t
vmPutMsgToSyncQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
);
// sync integration
int32_t
vmPutMsgToWriteQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
);
int32_t
vmPutMsgToWriteQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgToSyncQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgToApplyQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgToQueryQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgToFetchQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgTo
Apply
Queue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmPutMsgTo
Merge
Queue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
vmGetQueueSize
(
SMgmtWrapper
*
pWrapper
,
int32_t
vgId
,
EQueueType
qtype
);
int32_t
vmProcessWriteMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
浏览文件 @
34511150
...
...
@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
SVnodeObj
**
vmGetVnode
s
FromHash
(
SVnodesMgmt
*
pMgmt
,
int32_t
*
numOfVnodes
)
{
SVnodeObj
**
vmGetVnode
List
FromHash
(
SVnodesMgmt
*
pMgmt
,
int32_t
*
numOfVnodes
)
{
taosRLockLatch
(
&
pMgmt
->
latch
);
int32_t
num
=
0
;
...
...
@@ -44,14 +44,14 @@ SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) {
return
pVnodes
;
}
int32_t
vmGetVnode
s
FromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
)
{
int32_t
vmGetVnode
List
FromFile
(
SVnodesMgmt
*
pMgmt
,
SWrapperCfg
**
ppCfgs
,
int32_t
*
numOfVnodes
)
{
int32_t
code
=
TSDB_CODE_INVALID_JSON_FORMAT
;
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
char
file
[
PATH_MAX
];
char
file
[
PATH_MAX
]
=
{
0
}
;
SWrapperCfg
*
pCfgs
=
NULL
;
TdFilePtr
pFile
=
NULL
;
...
...
@@ -61,26 +61,26 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
if
(
pFile
==
NULL
)
{
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
len
=
(
int32_t
)
taosReadFile
(
pFile
,
content
,
maxLen
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
cJSON
*
vnodes
=
cJSON_GetObjectItem
(
root
,
"vnodes"
);
if
(
!
vnodes
||
vnodes
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since vnodes not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
int32_t
vnodesNum
=
cJSON_GetArraySize
(
vnodes
);
...
...
@@ -88,7 +88,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
pCfgs
=
taosMemoryCalloc
(
vnodesNum
,
sizeof
(
SWrapperCfg
));
if
(
pCfgs
==
NULL
)
{
dError
(
"failed to read %s since out of memory"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
for
(
int32_t
i
=
0
;
i
<
vnodesNum
;
++
i
)
{
...
...
@@ -98,7 +98,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
cJSON
*
vgId
=
cJSON_GetObjectItem
(
vnode
,
"vgId"
);
if
(
!
vgId
||
vgId
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since vgId not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
pCfg
->
vgId
=
vgId
->
valueint
;
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s%svnode%d"
,
pMgmt
->
path
,
TD_DIRSEP
,
pCfg
->
vgId
);
...
...
@@ -106,28 +106,28 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
cJSON
*
dropped
=
cJSON_GetObjectItem
(
vnode
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
pCfg
->
dropped
=
dropped
->
valueint
;
cJSON
*
vgVersion
=
cJSON_GetObjectItem
(
vnode
,
"vgVersion"
);
if
(
!
vgVersion
||
vgVersion
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since vgVersion not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
pCfg
->
vgVersion
=
vgVersion
->
valueint
;
cJSON
*
dbUid
=
cJSON_GetObjectItem
(
vnode
,
"dbUid"
);
if
(
!
dbUid
||
dbUid
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since dbUid not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
pCfg
->
dbUid
=
atoll
(
dbUid
->
valuestring
);
cJSON
*
db
=
cJSON_GetObjectItem
(
vnode
,
"db"
);
if
(
!
db
||
db
->
type
!=
cJSON_String
)
{
dError
(
"failed to read %s since db not found"
,
file
);
goto
PRASE_VNODE
_OVER
;
goto
_OVER
;
}
tstrncpy
(
pCfg
->
db
,
db
->
valuestring
,
TSDB_DB_FNAME_LEN
);
}
...
...
@@ -139,7 +139,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n
code
=
0
;
dInfo
(
"succcessed to read file %s"
,
file
);
PRASE_VNODE
_OVER:
_OVER:
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
...
...
@@ -148,7 +148,7 @@ PRASE_VNODE_OVER:
return
code
;
}
int32_t
vmWriteVnode
s
ToFile
(
SVnodesMgmt
*
pMgmt
)
{
int32_t
vmWriteVnode
List
ToFile
(
SVnodesMgmt
*
pMgmt
)
{
char
file
[
PATH_MAX
];
char
realfile
[
PATH_MAX
];
snprintf
(
file
,
sizeof
(
file
),
"%s%svnodes.json.bak"
,
pMgmt
->
path
,
TD_DIRSEP
);
...
...
@@ -162,7 +162,7 @@ int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt) {
}
int32_t
numOfVnodes
=
0
;
SVnodeObj
**
pVnodes
=
vmGetVnode
s
FromHash
(
pMgmt
,
&
numOfVnodes
);
SVnodeObj
**
pVnodes
=
vmGetVnode
List
FromHash
(
pMgmt
,
&
numOfVnodes
);
int32_t
len
=
0
;
int32_t
maxLen
=
65536
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
34511150
...
...
@@ -16,12 +16,37 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
void
vmGetVnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonVloadInfo
*
pInfo
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
pInfo
->
pVloads
=
taosArrayInit
(
pMgmt
->
state
.
totalVnodes
,
sizeof
(
SVnodeLoad
));
if
(
pInfo
->
pVloads
==
NULL
)
return
;
taosRLockLatch
(
&
pMgmt
->
latch
);
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
continue
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeLoad
vload
=
{
0
};
vnodeGetLoad
(
pVnode
->
pImpl
,
&
vload
);
taosArrayPush
(
pInfo
->
pVloads
,
&
vload
);
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
}
void
vmGetMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonVmInfo
*
pInfo
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SMonVloadInfo
vloads
=
{
0
};
vmGetVnodeLoads
(
pWrapper
,
&
vloads
);
if
(
vloads
.
pVloads
==
NULL
)
return
;
SArray
*
pVloads
=
vloads
.
pVloads
;
if
(
pVloads
==
NULL
)
return
;
int32_t
totalVnodes
=
0
;
int32_t
masterNum
=
0
;
...
...
@@ -31,8 +56,8 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
int64_t
numOfBatchInsertReqs
=
0
;
int64_t
numOfBatchInsertSuccessReqs
=
0
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
vloads
.
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
vloads
.
pVloads
,
i
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
pVloads
,
i
);
numOfSelectReqs
+=
pLoad
->
numOfSelectReqs
;
numOfInsertReqs
+=
pLoad
->
numOfInsertReqs
;
numOfInsertSuccessReqs
+=
pLoad
->
numOfInsertSuccessReqs
;
...
...
@@ -49,10 +74,16 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
pInfo
->
vstat
.
numOfInsertSuccessReqs
=
numOfInsertSuccessReqs
-
pMgmt
->
state
.
numOfInsertSuccessReqs
;
pInfo
->
vstat
.
numOfBatchInsertReqs
=
numOfBatchInsertReqs
-
pMgmt
->
state
.
numOfBatchInsertReqs
;
pInfo
->
vstat
.
numOfBatchInsertSuccessReqs
=
numOfBatchInsertSuccessReqs
-
pMgmt
->
state
.
numOfBatchInsertSuccessReqs
;
pMgmt
->
state
=
pInfo
->
vstat
;
pMgmt
->
state
.
totalVnodes
=
totalVnodes
;
pMgmt
->
state
.
masterNum
=
masterNum
;
pMgmt
->
state
.
numOfSelectReqs
=
numOfSelectReqs
;
pMgmt
->
state
.
numOfInsertReqs
=
numOfInsertReqs
;
pMgmt
->
state
.
numOfInsertSuccessReqs
=
numOfInsertSuccessReqs
;
pMgmt
->
state
.
numOfBatchInsertReqs
=
numOfBatchInsertReqs
;
pMgmt
->
state
.
numOfBatchInsertSuccessReqs
=
numOfBatchInsertSuccessReqs
;
tfsGetMonitorInfo
(
pMgmt
->
pTfs
,
&
pInfo
->
tfs
);
taosArrayDestroy
(
vloads
.
pVloads
);
taosArrayDestroy
(
pVloads
);
}
int32_t
vmProcessGetMonVmInfoReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pReq
)
{
...
...
@@ -107,12 +138,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
memcpy
(
pCfg
,
&
vnodeCfgDefault
,
sizeof
(
SVnodeCfg
));
pCfg
->
vgId
=
pCreate
->
vgId
;
strcpy
(
pCfg
->
dbname
,
pCreate
->
db
);
tstrncpy
(
pCfg
->
dbname
,
pCreate
->
db
,
sizeof
(
pCfg
->
dbname
));
pCfg
->
dbId
=
pCreate
->
dbUid
;
pCfg
->
isWeak
=
true
;
pCfg
->
tsdbCfg
.
days
=
10
;
pCfg
->
tsdbCfg
.
keep2
=
3650
;
pCfg
->
tsdbCfg
.
keep0
=
3650
;
pCfg
->
tsdbCfg
.
keep1
=
3650
;
pCfg
->
tsdbCfg
.
keep2
=
3650
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pCreate
->
pRetensions
);
++
i
)
{
memcpy
(
&
pCfg
->
tsdbCfg
.
retentions
[
i
],
taosArrayGet
(
pCreate
->
pRetensions
,
i
),
sizeof
(
SRetention
));
}
...
...
@@ -121,30 +153,30 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg
->
hashEnd
=
pCreate
->
hashEnd
;
pCfg
->
hashMethod
=
pCreate
->
hashMethod
;
// sync integration
pCfg
->
syncCfg
.
myIndex
=
pCreate
->
selfIndex
;
pCfg
->
syncCfg
.
replicaNum
=
pCreate
->
replica
;
memset
(
&
(
pCfg
->
syncCfg
.
nodeInfo
)
,
0
,
sizeof
(
pCfg
->
syncCfg
.
nodeInfo
));
memset
(
&
pCfg
->
syncCfg
.
nodeInfo
,
0
,
sizeof
(
pCfg
->
syncCfg
.
nodeInfo
));
for
(
int
i
=
0
;
i
<
pCreate
->
replica
;
++
i
)
{
(
pCfg
->
syncCfg
.
nodeInfo
)[
i
].
nodePort
=
(
pCreate
->
replicas
)
[
i
].
port
;
snprintf
(
(
pCfg
->
syncCfg
.
nodeInfo
)[
i
].
nodeFqdn
,
sizeof
((
pCfg
->
syncCfg
.
nodeInfo
)
[
i
].
nodeFqdn
),
"%s"
,
(
pCreate
->
replicas
)
[
i
].
fqdn
);
pCfg
->
syncCfg
.
nodeInfo
[
i
].
nodePort
=
pCreate
->
replicas
[
i
].
port
;
snprintf
(
pCfg
->
syncCfg
.
nodeInfo
[
i
].
nodeFqdn
,
sizeof
(
pCfg
->
syncCfg
.
nodeInfo
[
i
].
nodeFqdn
),
"%s"
,
pCreate
->
replicas
[
i
].
fqdn
);
}
}
static
void
vmGenerateWrapperCfg
(
SVnodesMgmt
*
pMgmt
,
SCreateVnodeReq
*
pCreate
,
SWrapperCfg
*
pCfg
)
{
memcpy
(
pCfg
->
db
,
pCreate
->
db
,
TSDB_DB_FNAME_LEN
);
pCfg
->
dbUid
=
pCreate
->
dbUid
;
pCfg
->
dropped
=
0
;
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s%svnode%d"
,
pMgmt
->
path
,
TD_DIRSEP
,
pCreate
->
vgId
);
pCfg
->
vgId
=
pCreate
->
vgId
;
pCfg
->
vgVersion
=
pCreate
->
vgVersion
;
pCfg
->
dropped
=
0
;
pCfg
->
dbUid
=
pCreate
->
dbUid
;
tstrncpy
(
pCfg
->
db
,
pCreate
->
db
,
TSDB_DB_FNAME_LEN
);
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s%svnode%d"
,
pMgmt
->
path
,
TD_DIRSEP
,
pCreate
->
vgId
);
}
int32_t
vmProcessCreateVnodeReq
(
SVnodesMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SRpcMsg
*
pReq
=
&
pMsg
->
rpcMsg
;
SCreateVnodeReq
createReq
=
{
0
};
char
path
[
TSDB_FILENAME_LEN
];
int32_t
code
=
-
1
;
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
};
if
(
tDeserializeSCreateVnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
...
...
@@ -161,14 +193,13 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
createReq
.
vgId
);
if
(
pVnode
!=
NULL
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
dDebug
(
"vgId:%d, already exist"
,
createReq
.
vgId
);
tFreeSCreateVnodeReq
(
&
createReq
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
terrno
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
return
-
1
;
}
// create vnode
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
vnodeCfg
.
vgId
);
if
(
vnodeCreate
(
path
,
&
vnodeCfg
,
pMgmt
->
pTfs
)
<
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
...
...
@@ -179,49 +210,43 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SMsgCb
msgCb
=
pMgmt
->
pDnode
->
data
.
msgCb
;
msgCb
.
pWrapper
=
pMgmt
->
pWrapper
;
msgCb
.
queueFps
[
WRITE_QUEUE
]
=
vmPutMsgToWriteQueue
;
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
vmPutMsgToSyncQueue
;
msgCb
.
queueFps
[
APPLY_QUEUE
]
=
vmPutMsgToApplyQueue
;
msgCb
.
queueFps
[
QUERY_QUEUE
]
=
vmPutMsgToQueryQueue
;
msgCb
.
queueFps
[
FETCH_QUEUE
]
=
vmPutMsgToFetchQueue
;
msgCb
.
queueFps
[
APPLY_QUEUE
]
=
vmPutMsgToApplyQueue
;
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
vmPutMsgToSyncQueue
;
// sync integration
msgCb
.
queueFps
[
MERGE_QUEUE
]
=
vmPutMsgToMergeQueue
;
msgCb
.
qsizeFp
=
vmGetQueueSize
;
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to create vnode since %s"
,
createReq
.
vgId
,
terrstr
());
tFreeSCreateVnodeReq
(
&
createReq
);
return
-
1
;
goto
_OVER
;
}
int32_t
code
=
vmOpenVnode
(
pMgmt
,
&
wrapperCfg
,
pImpl
);
code
=
vmOpenVnode
(
pMgmt
,
&
wrapperCfg
,
pImpl
);
if
(
code
!=
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
dError
(
"vgId:%d, failed to open vnode since %s"
,
createReq
.
vgId
,
terrstr
());
vnodeClose
(
pImpl
);
vnodeDestroy
(
path
,
pMgmt
->
pTfs
);
terrno
=
code
;
return
code
;
goto
_OVER
;
}
code
=
vnodeStart
(
pImpl
);
if
(
code
!=
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
dError
(
"vgId:%d, failed to start sync since %s"
,
createReq
.
vgId
,
terrstr
());
vnodeClose
(
pImpl
);
vnodeDestroy
(
path
,
pMgmt
->
pTfs
);
terrno
=
code
;
return
code
;
goto
_OVER
;
}
code
=
vmWriteVnodesToFile
(
pMgmt
);
code
=
vmWriteVnodeListToFile
(
pMgmt
);
if
(
code
!=
0
)
goto
_OVER
;
_OVER:
if
(
code
!=
0
)
{
tFreeSCreateVnodeReq
(
&
createReq
);
vnodeClose
(
pImpl
);
vnodeDestroy
(
path
,
pMgmt
->
pTfs
);
terrno
=
code
;
return
code
;
}
return
0
;
tFreeSCreateVnodeReq
(
&
createReq
);
terrno
=
code
;
return
code
;
}
int32_t
vmProcessDropVnodeReq
(
SVnodesMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
...
...
@@ -243,14 +268,14 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
}
pVnode
->
dropped
=
1
;
if
(
vmWriteVnode
s
ToFile
(
pMgmt
)
!=
0
)
{
if
(
vmWriteVnode
List
ToFile
(
pMgmt
)
!=
0
)
{
pVnode
->
dropped
=
0
;
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
-
1
;
}
vmCloseVnode
(
pMgmt
,
pVnode
);
vmWriteVnode
s
ToFile
(
pMgmt
);
vmWriteVnode
List
ToFile
(
pMgmt
);
return
0
;
}
...
...
@@ -287,7 +312,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CANCEL_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SUBMIT_RSMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_VG_CHANGE
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_VG_CHANGE
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CONSUME
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY_HEARTBEAT
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
...
...
@@ -300,14 +325,13 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_CREATE_VNODE
,
vmProcessMgmtMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_DROP_VNODE
,
vmProcessMgmtMsg
,
DEFAULT_HANDLE
);
// sync integration
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_TIMEOUT
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_PING
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_PING_REPLY
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_CLIENT_REQUEST
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_CLIENT_REQUEST_REPLY
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_REQUEST_VOTE
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_REQUEST_VOTE_REPLY
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_APPEND_ENTRIES
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_APPEND_ENTRIES_REPLY
,
(
NodeMsgFp
)
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_TIMEOUT
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_PING
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_PING_REPLY
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_CLIENT_REQUEST
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_CLIENT_REQUEST_REPLY
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_REQUEST_VOTE
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_REQUEST_VOTE_REPLY
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_APPEND_ENTRIES
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_SYNC_APPEND_ENTRIES_REPLY
,
vmProcessSyncMsg
,
DEFAULT_HANDLE
);
}
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
34511150
...
...
@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "libs/function/function.h"
SVnodeObj
*
vmAcquireVnode
(
SVnodesMgmt
*
pMgmt
,
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
NULL
;
...
...
@@ -55,14 +54,14 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode
->
vgId
=
pCfg
->
vgId
;
pVnode
->
refCount
=
0
;
pVnode
->
vgVersion
=
pCfg
->
vgVersion
;
pVnode
->
dropped
=
0
;
pVnode
->
accessState
=
TSDB_VN_ALL_ACCCESS
;
pVnode
->
pWrapper
=
pMgmt
->
pWrapper
;
pVnode
->
pImpl
=
pImpl
;
pVnode
->
vgVersion
=
pCfg
->
vgVersion
;
pVnode
->
dbUid
=
pCfg
->
dbUid
;
pVnode
->
db
=
tstrdup
(
pCfg
->
db
);
pVnode
->
path
=
tstrdup
(
pCfg
->
path
);
pVnode
->
pImpl
=
pImpl
;
pVnode
->
pWrapper
=
pMgmt
->
pWrapper
;
if
(
pVnode
->
path
==
NULL
||
pVnode
->
db
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -78,14 +77,11 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
int32_t
code
=
taosHashPut
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
),
&
pVnode
,
sizeof
(
SVnodeObj
*
));
taosWUnLockLatch
(
&
pMgmt
->
latch
);
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
}
return
code
;
}
void
vmCloseVnode
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
char
path
[
TSDB_FILENAME_LEN
];
char
path
[
TSDB_FILENAME_LEN
]
=
{
0
}
;
taosWLockLatch
(
&
pMgmt
->
latch
);
taosHashRemove
(
pMgmt
->
hash
,
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
...
...
@@ -98,6 +94,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
while
(
!
taosQueueEmpty
(
pVnode
->
pApplyQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pQueryQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pFetchQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pMergeQ
))
taosMsleep
(
10
);
vmFreeQueue
(
pMgmt
,
pVnode
);
vnodeClose
(
pVnode
->
pImpl
);
...
...
@@ -116,7 +113,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
taosMemoryFree
(
pVnode
);
}
static
void
*
vmOpenVnode
Func
(
void
*
param
)
{
static
void
*
vmOpenVnode
InThread
(
void
*
param
)
{
SVnodeThread
*
pThread
=
param
;
SVnodesMgmt
*
pMgmt
=
pThread
->
pMgmt
;
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
...
...
@@ -136,10 +133,11 @@ static void *vmOpenVnodeFunc(void *param) {
SMsgCb
msgCb
=
pMgmt
->
pDnode
->
data
.
msgCb
;
msgCb
.
pWrapper
=
pMgmt
->
pWrapper
;
msgCb
.
queueFps
[
WRITE_QUEUE
]
=
vmPutMsgToWriteQueue
;
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
vmPutMsgToSyncQueue
;
msgCb
.
queueFps
[
APPLY_QUEUE
]
=
vmPutMsgToApplyQueue
;
msgCb
.
queueFps
[
QUERY_QUEUE
]
=
vmPutMsgToQueryQueue
;
msgCb
.
queueFps
[
FETCH_QUEUE
]
=
vmPutMsgToFetchQueue
;
msgCb
.
queueFps
[
APPLY_QUEUE
]
=
vmPutMsgToApplyQueue
;
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
vmPutMsgToSyncQueue
;
// sync integration
msgCb
.
queueFps
[
MERGE_QUEUE
]
=
vmPutMsgToMergeQueue
;
msgCb
.
qsizeFp
=
vmGetQueueSize
;
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
pCfg
->
vgId
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
msgCb
);
...
...
@@ -148,12 +146,10 @@ static void *vmOpenVnodeFunc(void *param) {
pThread
->
failed
++
;
}
else
{
vmOpenVnode
(
pMgmt
,
pCfg
,
pImpl
);
// vnodeStart(pImpl);
dDebug
(
"vgId:%d, is opened by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
opened
++
;
atomic_add_fetch_32
(
&
pMgmt
->
state
.
openVnodes
,
1
);
}
atomic_add_fetch_32
(
&
pMgmt
->
state
.
openVnodes
,
1
);
}
dDebug
(
"thread:%d, total vnodes:%d, opened:%d failed:%d"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
,
pThread
->
opened
,
...
...
@@ -163,29 +159,24 @@ static void *vmOpenVnodeFunc(void *param) {
static
int32_t
vmOpenVnodes
(
SVnodesMgmt
*
pMgmt
)
{
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
pMgmt
->
hash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
if
(
pMgmt
->
hash
==
NULL
)
{
dError
(
"failed to init vnode hash"
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
dError
(
"failed to init vnode hash since %s"
,
terrstr
());
return
-
1
;
}
SWrapperCfg
*
pCfgs
=
NULL
;
int32_t
numOfVnodes
=
0
;
if
(
vmGetVnode
s
FromFile
(
pMgmt
,
&
pCfgs
,
&
numOfVnodes
)
!=
0
)
{
if
(
vmGetVnode
List
FromFile
(
pMgmt
,
&
pCfgs
,
&
numOfVnodes
)
!=
0
)
{
dInfo
(
"failed to get vnode list from disk since %s"
,
terrstr
());
return
-
1
;
}
pMgmt
->
state
.
totalVnodes
=
numOfVnodes
;
#if 0
int32_t threadNum = tsNumOfCores;
#else
int32_t
threadNum
=
1
;
#endif
int32_t
threadNum
=
1
;
// tsNumOfCores;
int32_t
vnodesPerThread
=
numOfVnodes
/
threadNum
+
1
;
SVnodeThread
*
threads
=
taosMemoryCalloc
(
threadNum
,
sizeof
(
SVnodeThread
));
...
...
@@ -210,7 +201,7 @@ static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) {
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
taosThreadCreate
(
&
pThread
->
thread
,
&
thAttr
,
vmOpenVnode
Func
,
pThread
)
!=
0
)
{
if
(
taosThreadCreate
(
&
pThread
->
thread
,
&
thAttr
,
vmOpenVnode
InThread
,
pThread
)
!=
0
)
{
dError
(
"thread:%d, failed to create thread to open vnode, reason:%s"
,
pThread
->
threadIndex
,
strerror
(
errno
));
}
...
...
@@ -240,7 +231,7 @@ static void vmCloseVnodes(SVnodesMgmt *pMgmt) {
dInfo
(
"start to close all vnodes"
);
int32_t
numOfVnodes
=
0
;
SVnodeObj
**
pVnodes
=
vmGetVnode
s
FromHash
(
pMgmt
,
&
numOfVnodes
);
SVnodeObj
**
pVnodes
=
vmGetVnode
List
FromHash
(
pMgmt
,
&
numOfVnodes
);
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
vmCloseVnode
(
pMgmt
,
pVnodes
[
i
]);
...
...
@@ -267,12 +258,9 @@ static void vmCleanup(SMgmtWrapper *pWrapper) {
vmStopWorker
(
pMgmt
);
vnodeCleanup
();
tfsClose
(
pMgmt
->
pTfs
);
// walCleanUp();
taosMemoryFree
(
pMgmt
);
pWrapper
->
pMgmt
=
NULL
;
// syncCleanUp();
udfcClose
();
dInfo
(
"vnode-mgmt is cleaned up"
);
}
...
...
@@ -313,7 +301,6 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
}
dmReportStartup
(
pDnode
,
"vnode-wal"
,
"initialized"
);
// sync integration
if
(
syncInit
()
!=
0
)
{
dError
(
"failed to open sync since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -381,23 +368,7 @@ static int32_t vmStart(SMgmtWrapper *pWrapper) {
}
static
void
vmStop
(
SMgmtWrapper
*
pWrapper
)
{
#if 0
dDebug("vnode-mgmt start to stop");
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
vnodeStop(pVnode->pImpl);
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
#endif
// process inside the vnode
}
void
vmSetMgmtFp
(
SMgmtWrapper
*
pWrapper
)
{
...
...
@@ -413,25 +384,3 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
pWrapper
->
fp
=
mgmtFp
;
}
void
vmGetVnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonVloadInfo
*
pInfo
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
pInfo
->
pVloads
=
taosArrayInit
(
pMgmt
->
state
.
totalVnodes
,
sizeof
(
SVnodeLoad
));
if
(
pInfo
->
pVloads
==
NULL
)
return
;
taosRLockLatch
(
&
pMgmt
->
latch
);
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
continue
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeLoad
vload
=
{
0
};
vnodeGetLoad
(
pVnode
->
pImpl
,
&
vload
);
taosArrayPush
(
pInfo
->
pVloads
,
&
vload
);
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
}
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
34511150
...
...
@@ -14,28 +14,29 @@
*/
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "sync.h"
#include "syncTools.h"
static
inline
void
vmSendRsp
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
.
code
=
code
,
.
pCont
=
pMsg
->
pRsp
,
.
contLen
=
pMsg
->
rspLen
};
static
inline
void
vmSendRsp
(
SNodeMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
ahandle
=
pMsg
->
rpcMsg
.
ahandle
,
.
refId
=
pMsg
->
rpcMsg
.
refId
,
.
code
=
code
,
.
pCont
=
pMsg
->
pRsp
,
.
contLen
=
pMsg
->
rspLen
,
};
tmsgSendRsp
(
&
rsp
);
}
static
void
vmProcessMgmtQueue
(
SQueueInfo
*
pInfo
,
SNodeMsg
*
pMsg
)
{
static
void
vmProcessMgmt
Monitor
Queue
(
SQueueInfo
*
pInfo
,
SNodeMsg
*
pMsg
)
{
SVnodesMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
tmsg_t
msgType
=
pMsg
->
rpcMsg
.
msgType
;
dTrace
(
"msg:%p, will be processed in vnode-m queue"
,
pMsg
);
dTrace
(
"msg:%p, will be processed in vnode-m
gmt/monitor
queue"
,
pMsg
);
switch
(
msgType
)
{
case
TDMT_MON_VM_INFO
:
...
...
@@ -52,12 +53,12 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
break
;
default:
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
dError
(
"msg:%p, not processed in vnode-mgmt queue"
,
pMsg
);
dError
(
"msg:%p, not processed in vnode-mgmt
/monitor
queue"
,
pMsg
);
}
if
(
msgType
&
1u
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pM
gmt
->
pWrapper
,
pM
sg
,
code
);
vmSendRsp
(
pMsg
,
code
);
}
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
...
...
@@ -71,7 +72,9 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
dTrace
(
"msg:%p, will be processed in vnode-query queue"
,
pMsg
);
int32_t
code
=
vnodeProcessQueryMsg
(
pVnode
->
pImpl
,
&
pMsg
->
rpcMsg
);
if
(
code
!=
0
)
{
vmSendRsp
(
pVnode
->
pWrapper
,
pMsg
,
code
);
if
(
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
rpcFreeCont
(
pMsg
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pMsg
);
...
...
@@ -84,7 +87,9 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
dTrace
(
"msg:%p, will be processed in vnode-fetch queue"
,
pMsg
);
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
&
pMsg
->
rpcMsg
,
pInfo
);
if
(
code
!=
0
)
{
vmSendRsp
(
pVnode
->
pWrapper
,
pMsg
,
code
);
if
(
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
rpcFreeCont
(
pMsg
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pMsg
);
...
...
@@ -108,32 +113,10 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
dTrace
(
"msg:%p, will be processed in vnode-write queue"
,
pMsg
);
if
(
taosArrayPush
(
pArray
,
&
pMsg
)
==
NULL
)
{
dTrace
(
"msg:%p, failed to process since %s"
,
pMsg
,
terrstr
());
vmSendRsp
(
p
Vnode
->
pWrapper
,
p
Msg
,
TSDB_CODE_OUT_OF_MEMORY
);
vmSendRsp
(
pMsg
,
TSDB_CODE_OUT_OF_MEMORY
);
}
}
#if 0
int64_t version;
vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version);
numOfMsgs = taosArrayGetSize(pArray);
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pRpc = &pMsg->rpcMsg;
rsp.pCont = NULL;
rsp.contLen = 0;
rsp.code = 0;
rsp.handle = pRpc->handle;
rsp.ahandle = pRpc->ahandle;
rsp.refId = pRpc->refId;
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
tmsgSendRsp(&rsp);
}
#else
// sync integration response
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
i
++
)
{
SNodeMsg
*
pMsg
;
SRpcMsg
*
pRpc
;
...
...
@@ -168,7 +151,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
assert
(
0
);
}
}
#endif
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
i
++
)
{
SNodeMsg
*
pMsg
=
*
(
SNodeMsg
**
)
taosArrayGet
(
pArray
,
i
);
...
...
@@ -186,9 +168,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
SRpcMsg
rsp
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
#if 1
// sync integration
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
// init response rpc msg
...
...
@@ -219,7 +198,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp
.
refId
=
pMsg
->
rpcMsg
.
refId
;
tmsgSendRsp
(
&
rsp
);
}
#endif
}
}
...
...
@@ -246,7 +224,9 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
dTrace
(
"msg:%p, will be processed in vnode-merge queue"
,
pMsg
);
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
&
pMsg
->
rpcMsg
,
pInfo
);
if
(
code
!=
0
)
{
vmSendRsp
(
pVnode
->
pWrapper
,
pMsg
,
code
);
if
(
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
rpcFreeCont
(
pMsg
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pMsg
);
...
...
@@ -257,16 +237,17 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static
int32_t
vmPutNodeMsgToQueue
(
SVnodesMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
,
EQueueType
qtype
)
{
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
SMsgHead
*
pHead
=
pRpc
->
pCont
;
int32_t
code
=
0
;
pHead
->
contLen
=
ntohl
(
pHead
->
contLen
);
pHead
->
vgId
=
ntohl
(
pHead
->
vgId
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
{
dError
(
"vgId:%d, failed to write msg:%p to vnode-queue since %s"
,
pHead
->
vgId
,
pMsg
,
terrstr
());
return
terrno
;
return
terrno
!=
0
?
terrno
:
-
1
;
}
int32_t
code
=
0
;
switch
(
qtype
)
{
case
QUERY_QUEUE
:
dTrace
(
"msg:%p, type:%s will be written into vnode-query queue"
,
pMsg
,
TMSG_INFO
(
pRpc
->
msgType
));
...
...
@@ -326,7 +307,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
int32_t
vmProcessMgmtMsg
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
mgmtWorker
;
dTrace
(
"msg:%p, will be
written
to vnode-mgmt queue, worker:%s"
,
pMsg
,
pWorker
->
name
);
dTrace
(
"msg:%p, will be
put in
to vnode-mgmt queue, worker:%s"
,
pMsg
,
pWorker
->
name
);
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
return
0
;
}
...
...
@@ -335,7 +316,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
monitorWorker
;
dTrace
(
"msg:%p,
put into
worker:%s"
,
pMsg
,
pWorker
->
name
);
dTrace
(
"msg:%p,
will be put into vnode-monitor queue,
worker:%s"
,
pMsg
,
pWorker
->
name
);
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
return
0
;
}
...
...
@@ -350,9 +331,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
SNodeMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
));
int32_t
code
=
0
;
if
(
pMsg
==
NULL
)
{
code
=
-
1
;
}
else
{
if
(
pMsg
!=
NULL
)
{
dTrace
(
"msg:%p, is created, type:%s"
,
pMsg
,
TMSG_INFO
(
pRpc
->
msgType
));
pMsg
->
rpcMsg
=
*
pRpc
;
// if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
...
...
@@ -377,7 +356,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
dTrace
(
"msg:%p, will be put into vnode-merge queue"
,
pMsg
);
taosWriteQitem
(
pVnode
->
pMergeQ
,
pMsg
);
break
;
case
SYNC_QUEUE
:
// sync integration
case
SYNC_QUEUE
:
dTrace
(
"msg:%p, will be put into vnode-sync queue"
,
pMsg
);
taosWriteQitem
(
pVnode
->
pSyncQ
,
pMsg
);
break
;
...
...
@@ -387,6 +366,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
break
;
}
}
vmReleaseVnode
(
pMgmt
,
pVnode
);
return
code
;
}
...
...
@@ -395,6 +375,14 @@ int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
WRITE_QUEUE
);
}
int32_t
vmPutMsgToSyncQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
SYNC_QUEUE
);
}
int32_t
vmPutMsgToApplyQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
APPLY_QUEUE
);
}
int32_t
vmPutMsgToQueryQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
QUERY_QUEUE
);
}
...
...
@@ -403,30 +391,15 @@ int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
FETCH_QUEUE
);
}
int32_t
vmPutMsgToApplyQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
APPLY_QUEUE
);
}
int32_t
vmPutMsgToMergeQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
MERGE_QUEUE
);
}
// sync integration
int32_t
vmPutMsgToSyncQueue
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pRpc
)
{
return
vmPutRpcMsgToQueue
(
pWrapper
,
pRpc
,
SYNC_QUEUE
);
}
int32_t
vmGetQueueSize
(
SMgmtWrapper
*
pWrapper
,
int32_t
vgId
,
EQueueType
qtype
)
{
int32_t
size
=
-
1
;
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pWrapper
->
pMgmt
,
vgId
);
if
(
pVnode
!=
NULL
)
{
switch
(
qtype
)
{
case
QUERY_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pQueryQ
);
break
;
case
FETCH_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pFetchQ
);
break
;
case
WRITE_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pWriteQ
);
break
;
...
...
@@ -436,6 +409,12 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
case
APPLY_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pApplyQ
);
break
;
case
QUERY_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pQueryQ
);
break
;
case
FETCH_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pFetchQ
);
break
;
case
MERGE_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pMergeQ
);
break
;
...
...
@@ -449,14 +428,14 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t
vmAllocQueue
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
pVnode
->
pWriteQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
vmProcessWriteQueue
);
pVnode
->
pApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
vmProcessApplyQueue
);
pVnode
->
pMergeQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
mergePool
,
pVnode
,
(
FItems
)
vmProcessMergeQueue
);
pVnode
->
pSyncQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
syncPool
,
pVnode
,
(
FItems
)
vmProcessSyncQueue
);
pVnode
->
p
FetchQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItem
)
vmProcessFetch
Queue
);
pVnode
->
p
ApplyQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
writePool
,
pVnode
,
(
FItems
)
vmProcessApply
Queue
);
pVnode
->
pQueryQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
queryPool
,
pVnode
,
(
FItem
)
vmProcessQueryQueue
);
pVnode
->
pFetchQ
=
tQWorkerAllocQueue
(
&
pMgmt
->
fetchPool
,
pVnode
,
(
FItem
)
vmProcessFetchQueue
);
pVnode
->
pMergeQ
=
tWWorkerAllocQueue
(
&
pMgmt
->
mergePool
,
pVnode
,
(
FItems
)
vmProcessMergeQueue
);
if
(
pVnode
->
p
ApplyQ
==
NULL
||
pVnode
->
pWriteQ
==
NULL
||
pVnode
->
pSyncQ
==
NULL
||
pVnode
->
pFetch
Q
==
NULL
||
pVnode
->
p
Query
Q
==
NULL
||
pVnode
->
pMergeQ
==
NULL
)
{
if
(
pVnode
->
p
WriteQ
==
NULL
||
pVnode
->
pSyncQ
==
NULL
||
pVnode
->
pApplyQ
==
NULL
||
pVnode
->
pQuery
Q
==
NULL
||
pVnode
->
p
Fetch
Q
==
NULL
||
pVnode
->
pMergeQ
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
...
...
@@ -466,17 +445,17 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
}
void
vmFreeQueue
(
SVnodesMgmt
*
pMgmt
,
SVnodeObj
*
pVnode
)
{
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
writePool
,
pVnode
->
pWriteQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
syncPool
,
pVnode
->
pSyncQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
writePool
,
pVnode
->
pApplyQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
queryPool
,
pVnode
->
pQueryQ
);
tQWorkerFreeQueue
(
&
pMgmt
->
fetchPool
,
pVnode
->
pFetchQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
mergePool
,
pVnode
->
pMergeQ
);
tWWorkerFreeQueue
(
&
pMgmt
->
syncPool
,
pVnode
->
pSyncQ
);
pVnode
->
pWriteQ
=
NULL
;
pVnode
->
pApplyQ
=
NULL
;
pVnode
->
pSyncQ
=
NULL
;
pVnode
->
p
Fetch
Q
=
NULL
;
pVnode
->
p
Apply
Q
=
NULL
;
pVnode
->
pQueryQ
=
NULL
;
pVnode
->
pFetchQ
=
NULL
;
pVnode
->
pMergeQ
=
NULL
;
dDebug
(
"vgId:%d, vnode queue is freed"
,
pVnode
->
vgId
);
}
...
...
@@ -499,17 +478,23 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
pWPool
->
max
=
tsNumOfVnodeWriteThreads
;
if
(
tWWorkerInit
(
pWPool
)
!=
0
)
return
-
1
;
pWPool
=
&
pMgmt
->
syncPool
;
pWPool
->
name
=
"vnode-sync"
;
pWPool
->
max
=
tsNumOfVnodeSyncThreads
;
if
(
tWWorkerInit
(
pWPool
)
!=
0
)
return
-
1
;
pWPool
=
&
pMgmt
->
mergePool
;
pWPool
->
name
=
"vnode-merge"
;
pWPool
->
max
=
tsNumOfVnodeMergeThreads
;
if
(
tWWorkerInit
(
pWPool
)
!=
0
)
return
-
1
;
SSingleWorkerCfg
cfg
=
{.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-mgmt"
,
.
fp
=
(
FItem
)
vmProcessMgmtQueue
,
.
param
=
pMgmt
};
SWWorkerPool
*
pSPool
=
&
pMgmt
->
syncPool
;
pSPool
->
name
=
"vnode-sync"
;
pSPool
->
max
=
tsNumOfVnodeSyncThreads
;
if
(
tWWorkerInit
(
pSPool
)
!=
0
)
return
-
1
;
SWWorkerPool
*
pMPool
=
&
pMgmt
->
mergePool
;
pMPool
->
name
=
"vnode-merge"
;
pMPool
->
max
=
tsNumOfVnodeMergeThreads
;
if
(
tWWorkerInit
(
pMPool
)
!=
0
)
return
-
1
;
SSingleWorkerCfg
cfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-mgmt"
,
.
fp
=
(
FItem
)
vmProcessMgmtMonitorQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
mgmtWorker
,
&
cfg
)
!=
0
)
{
dError
(
"failed to start vnode-mgmt worker since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -517,7 +502,12 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
if
(
tsMultiProcess
)
{
SSingleWorkerCfg
mCfg
=
{
.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-monitor"
,
.
fp
=
(
FItem
)
vmProcessMgmtQueue
,
.
param
=
pMgmt
};
.
min
=
1
,
.
max
=
1
,
.
name
=
"vnode-monitor"
,
.
fp
=
(
FItem
)
vmProcessMgmtMonitorQueue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
dError
(
"failed to start mnode vnode-monitor worker since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -531,10 +521,10 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
void
vmStopWorker
(
SVnodesMgmt
*
pMgmt
)
{
tSingleWorkerCleanup
(
&
pMgmt
->
monitorWorker
);
tSingleWorkerCleanup
(
&
pMgmt
->
mgmtWorker
);
tQWorkerCleanup
(
&
pMgmt
->
fetchPool
);
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tWWorkerCleanup
(
&
pMgmt
->
writePool
);
tWWorkerCleanup
(
&
pMgmt
->
syncPool
);
tQWorkerCleanup
(
&
pMgmt
->
queryPool
);
tQWorkerCleanup
(
&
pMgmt
->
fetchPool
);
tWWorkerCleanup
(
&
pMgmt
->
mergePool
);
dDebug
(
"vnode workers are closed"
);
}
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
34511150
...
...
@@ -310,7 +310,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
code
=
taosFsyncFile
(
pFile
);
if
(
code
!=
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to
write
file:%s since %s"
,
tmpfile
,
tstrerror
(
code
));
mError
(
"failed to
sync
file:%s since %s"
,
tmpfile
,
tstrerror
(
code
));
}
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
34511150
...
...
@@ -148,7 +148,7 @@ struct STsdbCfg {
struct
SVnodeCfg
{
int32_t
vgId
;
char
dbname
[
TSDB_DB_NAME_LEN
];
char
dbname
[
TSDB_DB_
F
NAME_LEN
];
uint64_t
dbId
;
int32_t
szPage
;
int32_t
szCache
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
34511150
...
...
@@ -1985,7 +1985,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
return
;
}
else
if
(
pCheckInfo
->
iter
!=
NULL
||
pCheckInfo
->
iiter
!=
NULL
)
{
SSkipListNode
*
node
=
NULL
;
TSKEY
last
RowKey
=
TSKEY_INITIAL_VAL
;
TSKEY
last
KeyAppend
=
TSKEY_INITIAL_VAL
;
do
{
STSRow
*
row2
=
NULL
;
...
...
@@ -2019,7 +2019,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
numOfRows
+=
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
&
curRow
,
row1
,
row2
,
numOfCols
,
pCheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
pCfg
->
update
,
&
last
RowKey
);
pCheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
pCfg
->
update
,
&
last
KeyAppend
);
// numOfRows += 1;
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
cur
->
win
.
skey
=
key
;
...
...
@@ -2076,7 +2076,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
numOfRows
+=
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
&
curRow
,
row1
,
row2
,
numOfCols
,
pCheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
pCfg
->
update
,
&
last
RowKey
);
pCheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
pCfg
->
update
,
&
last
KeyAppend
);
// ++numOfRows;
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
cur
->
win
.
skey
=
key
;
...
...
@@ -2117,10 +2117,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t
qstart
=
0
,
qend
=
0
;
getQualifiedRowsPos
(
pTsdbReadHandle
,
pos
,
end
,
numOfRows
,
&
qstart
,
&
qend
);
lastKeyAppend
=
tsArray
[
qend
];
numOfRows
=
doCopyRowsFromFileBlock
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
numOfRows
,
qstart
,
qend
);
pos
+=
(
qend
-
qstart
+
1
)
*
step
;
if
(
numOfRows
>
0
)
{
curRow
=
numOfRows
-
1
;
}
cur
->
win
.
ekey
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
?
tsArray
[
qend
]
:
tsArray
[
qstart
];
cur
->
lastKey
=
cur
->
win
.
ekey
+
step
;
}
...
...
source/libs/function/CMakeLists.txt
浏览文件 @
34511150
...
...
@@ -36,7 +36,7 @@ target_link_libraries(
PRIVATE os util common nodes function
)
add_library
(
udf1 MODULE test/udf1.c
)
add_library
(
udf1
STATIC
MODULE test/udf1.c
)
target_include_directories
(
udf1
PUBLIC
...
...
@@ -50,7 +50,7 @@ target_include_directories(
target_link_libraries
(
udf1 PUBLIC os
)
add_library
(
udf2 MODULE test/udf2.c
)
add_library
(
udf2
STATIC
MODULE test/udf2.c
)
target_include_directories
(
udf2
PUBLIC
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
34511150
...
...
@@ -67,6 +67,7 @@ typedef struct SSrvMsg {
typedef
struct
SWorkThrdObj
{
TdThread
thread
;
uv_connect_t
connect_req
;
uv_pipe_t
*
pipe
;
uv_os_fd_t
fd
;
uv_loop_t
*
loop
;
...
...
@@ -87,8 +88,10 @@ typedef struct SServerObj {
// work thread info
int
workerIdx
;
int
numOfThreads
;
int
numOfWorkerReady
;
SWorkThrdObj
**
pThreadObj
;
uv_pipe_t
pipeListen
;
uv_pipe_t
**
pipe
;
uint32_t
ip
;
uint32_t
port
;
...
...
@@ -161,7 +164,7 @@ static void* transWorkerThread(void* arg);
static
void
*
transAcceptThread
(
void
*
arg
);
// add handle loop
static
bool
addHandleToWorkloop
(
void
*
arg
);
static
bool
addHandleToWorkloop
(
SWorkThrdObj
*
pThrd
,
char
*
pipeName
);
static
bool
addHandleToAcceptloop
(
void
*
arg
);
#define CONN_SHOULD_RELEASE(conn, head) \
...
...
@@ -577,6 +580,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_tcp_init
(
pObj
->
loop
,
cli
);
if
(
uv_accept
(
stream
,
(
uv_stream_t
*
)
cli
)
==
0
)
{
if
(
pObj
->
numOfWorkerReady
<
pObj
->
numOfThreads
)
{
tError
(
"worker-threads are not ready for all, need %d instead of %d."
,
pObj
->
numOfThreads
,
pObj
->
numOfWorkerReady
);
uv_close
((
uv_handle_t
*
)
cli
,
NULL
);
return
;
}
uv_write_t
*
wr
=
(
uv_write_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_write_t
));
wr
->
data
=
cli
;
uv_buf_t
buf
=
uv_buf_init
((
char
*
)
notify
,
strlen
(
notify
));
...
...
@@ -672,15 +681,21 @@ void* transAcceptThread(void* arg) {
return
NULL
;
}
static
bool
addHandleToWorkloop
(
void
*
arg
)
{
SWorkThrdObj
*
pThrd
=
arg
;
void
uvOnPipeConnectionCb
(
uv_connect_t
*
connect
,
int
status
)
{
if
(
status
!=
0
)
{
return
;
}
SWorkThrdObj
*
pThrd
=
container_of
(
connect
,
SWorkThrdObj
,
connect_req
);
uv_read_start
((
uv_stream_t
*
)
pThrd
->
pipe
,
uvAllocConnBufferCb
,
uvOnConnectionCb
);
}
static
bool
addHandleToWorkloop
(
SWorkThrdObj
*
pThrd
,
char
*
pipeName
)
{
pThrd
->
loop
=
(
uv_loop_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
if
(
0
!=
uv_loop_init
(
pThrd
->
loop
))
{
return
false
;
}
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
uv_pipe_open
(
pThrd
->
pipe
,
pThrd
->
fd
);
// int r =
uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd
->
pipe
->
data
=
pThrd
;
...
...
@@ -691,7 +706,8 @@ static bool addHandleToWorkloop(void* arg) {
QUEUE_INIT
(
&
pThrd
->
conn
);
pThrd
->
asyncPool
=
transCreateAsyncPool
(
pThrd
->
loop
,
5
,
pThrd
,
uvWorkerAsyncCb
);
uv_read_start
((
uv_stream_t
*
)
pThrd
->
pipe
,
uvAllocConnBufferCb
,
uvOnConnectionCb
);
uv_pipe_connect
(
&
pThrd
->
connect_req
,
pThrd
->
pipe
,
pipeName
,
uvOnPipeConnectionCb
);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return
true
;
}
...
...
@@ -802,12 +818,32 @@ static void uvDestroyConn(uv_handle_t* handle) {
uv_walk
(
thrd
->
loop
,
uvWalkCb
,
NULL
);
}
}
static
void
uvPipeListenCb
(
uv_stream_t
*
handle
,
int
status
)
{
ASSERT
(
status
==
0
);
SServerObj
*
srv
=
container_of
(
handle
,
SServerObj
,
pipeListen
);
uv_pipe_t
*
pipe
=
&
(
srv
->
pipe
[
srv
->
numOfWorkerReady
][
0
]);
ASSERT
(
0
==
uv_pipe_init
(
srv
->
loop
,
pipe
,
1
));
ASSERT
(
0
==
uv_accept
((
uv_stream_t
*
)
&
srv
->
pipeListen
,
(
uv_stream_t
*
)
pipe
));
ASSERT
(
1
==
uv_is_readable
((
uv_stream_t
*
)
pipe
));
ASSERT
(
1
==
uv_is_writable
((
uv_stream_t
*
)
pipe
));
ASSERT
(
0
==
uv_is_closing
((
uv_handle_t
*
)
pipe
));
srv
->
numOfWorkerReady
++
;
// ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb));
// r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb);
// ASSERT(r == 0);
}
void
*
transInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SServerObj
*
srv
=
taosMemoryCalloc
(
1
,
sizeof
(
SServerObj
));
srv
->
loop
=
(
uv_loop_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
srv
->
numOfThreads
=
numOfThreads
;
srv
->
workerIdx
=
0
;
srv
->
numOfWorkerReady
=
0
;
srv
->
pThreadObj
=
(
SWorkThrdObj
**
)
taosMemoryCalloc
(
srv
->
numOfThreads
,
sizeof
(
SWorkThrdObj
*
));
srv
->
pipe
=
(
uv_pipe_t
**
)
taosMemoryCalloc
(
srv
->
numOfThreads
,
sizeof
(
uv_pipe_t
*
));
srv
->
ip
=
ip
;
...
...
@@ -817,6 +853,16 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
taosThreadOnce
(
&
transModuleInit
,
uvInitEnv
);
transSrvInst
++
;
char
pipeName
[
64
];
assert
(
0
==
uv_pipe_init
(
srv
->
loop
,
&
srv
->
pipeListen
,
0
));
#ifdef WINDOWS
snprintf
(
pipeName
,
sizeof
(
pipeName
),
"
\\\\
?
\\
pipe
\\
trans.rpc
\\
%p-%lu"
,
taosSafeRand
(),
GetCurrentProcessId
());
#else
snprintf
(
pipeName
,
sizeof
(
pipeName
),
".trans.rpc
\\
%08X-%lu"
,
taosSafeRand
(),
taosGetSelfPthreadId
());
#endif
assert
(
0
==
uv_pipe_bind
(
&
srv
->
pipeListen
,
pipeName
));
assert
(
0
==
uv_listen
((
uv_stream_t
*
)
&
srv
->
pipeListen
,
SOMAXCONN
,
uvPipeListenCb
));
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
SWorkThrdObj
*
thrd
=
(
SWorkThrdObj
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SWorkThrdObj
));
thrd
->
pTransInst
=
shandle
;
...
...
@@ -826,17 +872,22 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
taosMemoryCalloc
(
2
,
sizeof
(
uv_pipe_t
));
uv_os_sock_t
fds
[
2
];
if
(
uv_socketpair
(
SOCK_STREAM
,
0
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
goto
End
;
}
uv_pipe_init
(
srv
->
loop
,
&
(
srv
->
pipe
[
i
][
0
]),
1
);
uv_pipe_open
(
&
(
srv
->
pipe
[
i
][
0
]),
fds
[
1
]);
// init write
thrd
->
fd
=
fds
[
0
];
// #ifdef WINDOWS
// uv_file fds[2];
// if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) {
// #else
// uv_os_sock_t fds[2];
// if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
// #endif
// goto End;
// }
// uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
// uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
// thrd->fd = fds[0];
thrd
->
pipe
=
&
(
srv
->
pipe
[
i
][
1
]);
// init read
if
(
false
==
addHandleToWorkloop
(
thrd
))
{
if
(
false
==
addHandleToWorkloop
(
thrd
,
pipeName
))
{
goto
End
;
}
int
err
=
taosThreadCreate
(
&
(
thrd
->
thread
),
NULL
,
transWorkerThread
,
(
void
*
)(
thrd
));
...
...
source/os/src/osDir.c
浏览文件 @
34511150
...
...
@@ -204,7 +204,7 @@ int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) {
int32_t
taosRealPath
(
char
*
dirname
,
char
*
realPath
,
int32_t
maxlen
)
{
char
tmp
[
PATH_MAX
]
=
{
0
};
#ifdef WINDOWS
if
(
_fullpath
(
dirname
,
tmp
,
maxlen
)
!=
NULL
)
{
if
(
_fullpath
(
tmp
,
dirname
,
maxlen
)
!=
NULL
)
{
#else
if
(
realpath
(
dirname
,
tmp
)
!=
NULL
)
{
#endif
...
...
source/os/src/osFile.c
浏览文件 @
34511150
...
...
@@ -543,7 +543,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
HANDLE
h
=
(
HANDLE
)
_get_osfhandle
(
pFile
->
fd
);
return
FlushFileBuffers
(
h
);
return
!
FlushFileBuffers
(
h
);
#else
if
(
pFile
==
NULL
)
{
return
0
;
...
...
source/os/src/osSysinfo.c
浏览文件 @
34511150
...
...
@@ -869,11 +869,15 @@ SysNameInfo taosGetSysNameInfo() {
SysNameInfo
info
=
{
0
};
DWORD
dwVersion
=
GetVersion
();
tstrncpy
(
info
.
sysname
,
getenv
(
"OS"
),
sizeof
(
info
.
sysname
));
tstrncpy
(
info
.
nodename
,
getenv
(
"COMPUTERNAME"
),
sizeof
(
info
.
nodename
));
char
*
tmp
=
NULL
;
tmp
=
getenv
(
"OS"
);
if
(
tmp
!=
NULL
)
tstrncpy
(
info
.
sysname
,
tmp
,
sizeof
(
info
.
sysname
));
tmp
=
getenv
(
"COMPUTERNAME"
);
if
(
tmp
!=
NULL
)
tstrncpy
(
info
.
nodename
,
tmp
,
sizeof
(
info
.
nodename
));
sprintf_s
(
info
.
release
,
sizeof
(
info
.
release
),
"%d"
,
dwVersion
&
0x0F
);
sprintf_s
(
info
.
version
,
sizeof
(
info
.
release
),
"%d"
,
(
dwVersion
>>
8
)
&
0x0F
);
tstrncpy
(
info
.
machine
,
getenv
(
"PROCESSOR_ARCHITECTURE"
),
sizeof
(
info
.
machine
));
tmp
=
getenv
(
"PROCESSOR_ARCHITECTURE"
);
if
(
tmp
!=
NULL
)
tstrncpy
(
info
.
machine
,
tmp
,
sizeof
(
info
.
machine
));
return
info
;
#elif defined(_TD_DARWIN_64)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录