Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f2320dc7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f2320dc7
编写于
1月 06, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/feature/sim' into feature/os
上级
9c7b14b7
899f9089
变更
8
显示空白变更内容
内联
并排
Showing
8 changed file
with
89 addition
and
35 deletion
+89
-35
src/inc/query.h
src/inc/query.h
+1
-0
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+13
-0
src/vnode/inc/vnodeRead.h
src/vnode/inc/vnodeRead.h
+1
-0
src/vnode/inc/vnodeWrite.h
src/vnode/inc/vnodeWrite.h
+1
-0
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+12
-13
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+31
-14
src/vnode/src/vnodeStatus.c
src/vnode/src/vnodeStatus.c
+19
-1
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+11
-7
未找到文件。
src/inc/query.h
浏览文件 @
f2320dc7
...
@@ -86,6 +86,7 @@ void qDestroyQueryInfo(qinfo_t qHandle);
...
@@ -86,6 +86,7 @@ void qDestroyQueryInfo(qinfo_t qHandle);
void
*
qOpenQueryMgmt
(
int32_t
vgId
);
void
*
qOpenQueryMgmt
(
int32_t
vgId
);
void
qQueryMgmtNotifyClosed
(
void
*
pExecutor
);
void
qQueryMgmtNotifyClosed
(
void
*
pExecutor
);
void
qQueryMgmtReOpen
(
void
*
pExecutor
);
void
qCleanupQueryMgmt
(
void
*
pExecutor
);
void
qCleanupQueryMgmt
(
void
*
pExecutor
);
void
**
qRegisterQInfo
(
void
*
pMgmt
,
uint64_t
qInfo
);
void
**
qRegisterQInfo
(
void
*
pMgmt
,
uint64_t
qInfo
);
void
**
qAcquireQInfo
(
void
*
pMgmt
,
uint64_t
key
);
void
**
qAcquireQInfo
(
void
*
pMgmt
,
uint64_t
key
);
...
...
src/query/src/qExecutor.c
浏览文件 @
f2320dc7
...
@@ -7635,6 +7635,19 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) {
...
@@ -7635,6 +7635,19 @@ void qQueryMgmtNotifyClosed(void* pQMgmt) {
taosCacheRefresh
(
pQueryMgmt
->
qinfoPool
,
queryMgmtKillQueryFn
);
taosCacheRefresh
(
pQueryMgmt
->
qinfoPool
,
queryMgmtKillQueryFn
);
}
}
void
qQueryMgmtReOpen
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
SQueryMgmt
*
pQueryMgmt
=
pQMgmt
;
qDebug
(
"vgId:%d, set querymgmt reopen"
,
pQueryMgmt
->
vgId
);
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
pQueryMgmt
->
closed
=
false
;
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
}
void
qCleanupQueryMgmt
(
void
*
pQMgmt
)
{
void
qCleanupQueryMgmt
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
return
;
...
...
src/vnode/inc/vnodeRead.h
浏览文件 @
f2320dc7
...
@@ -27,6 +27,7 @@ void vnodeCleanupRead(void);
...
@@ -27,6 +27,7 @@ void vnodeCleanupRead(void);
int32_t
vnodeWriteToRQueue
(
void
*
pVnode
,
void
*
pCont
,
int32_t
contLen
,
int8_t
qtype
,
void
*
rparam
);
int32_t
vnodeWriteToRQueue
(
void
*
pVnode
,
void
*
pCont
,
int32_t
contLen
,
int8_t
qtype
,
void
*
rparam
);
void
vnodeFreeFromRQueue
(
void
*
pVnode
,
SVReadMsg
*
pRead
);
void
vnodeFreeFromRQueue
(
void
*
pVnode
,
SVReadMsg
*
pRead
);
int32_t
vnodeProcessRead
(
void
*
pVnode
,
SVReadMsg
*
pRead
);
int32_t
vnodeProcessRead
(
void
*
pVnode
,
SVReadMsg
*
pRead
);
void
vnodeWaitReadCompleted
(
void
*
pVnode
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/vnode/inc/vnodeWrite.h
浏览文件 @
f2320dc7
...
@@ -27,6 +27,7 @@ void vnodeCleanupWrite(void);
...
@@ -27,6 +27,7 @@ void vnodeCleanupWrite(void);
int32_t
vnodeWriteToWQueue
(
void
*
pVnode
,
void
*
pHead
,
int32_t
qtype
,
void
*
pRpcMsg
);
int32_t
vnodeWriteToWQueue
(
void
*
pVnode
,
void
*
pHead
,
int32_t
qtype
,
void
*
pRpcMsg
);
void
vnodeFreeFromWQueue
(
void
*
pVnode
,
SVWriteMsg
*
pWrite
);
void
vnodeFreeFromWQueue
(
void
*
pVnode
,
SVWriteMsg
*
pWrite
);
int32_t
vnodeProcessWrite
(
void
*
pVnode
,
void
*
pHead
,
int32_t
qtype
,
void
*
pRspRet
);
int32_t
vnodeProcessWrite
(
void
*
pVnode
,
void
*
pHead
,
int32_t
qtype
,
void
*
pRspRet
);
void
vnodeWaitWriteCompleted
(
void
*
pVnode
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
f2320dc7
...
@@ -153,6 +153,11 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
...
@@ -153,6 +153,11 @@ static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
int32_t
vnodeAlter
(
void
*
vparam
,
SCreateVnodeMsg
*
pVnodeCfg
)
{
int32_t
vnodeAlter
(
void
*
vparam
,
SCreateVnodeMsg
*
pVnodeCfg
)
{
SVnodeObj
*
pVnode
=
vparam
;
SVnodeObj
*
pVnode
=
vparam
;
if
(
pVnode
->
dbCfgVersion
==
pVnodeCfg
->
cfg
.
dbCfgVersion
&&
pVnode
->
vgCfgVersion
==
pVnodeCfg
->
cfg
.
vgCfgVersion
)
{
vDebug
(
"vgId:%d, dbCfgVersion:%d and vgCfgVersion:%d not change"
,
pVnode
->
vgId
,
pVnode
->
dbCfgVersion
,
pVnode
->
vgCfgVersion
);
return
TSDB_CODE_SUCCESS
;
}
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// dbCfgVersion can be corrected by status msg
// dbCfgVersion can be corrected by status msg
...
@@ -411,15 +416,12 @@ void vnodeDestroy(SVnodeObj *pVnode) {
...
@@ -411,15 +416,12 @@ void vnodeDestroy(SVnodeObj *pVnode) {
}
}
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
)
{
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
)
{
if
(
!
vnodeInInitStatus
(
pVnode
))
{
vDebug
(
"vgId:%d, vnode will cleanup, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
// it may be in updateing or reset state, then it shall wait
int32_t
i
=
0
;
vnodeSetClosingStatus
(
pVnode
);
while
(
!
vnodeSetClosingStatus
(
pVnode
))
{
if
(
++
i
%
1000
==
0
)
{
// release local resources only after cutting off outside connections
sched_yield
();
qQueryMgmtNotifyClosed
(
pVnode
->
qMgmt
);
}
}
}
// stop replication module
// stop replication module
if
(
pVnode
->
sync
>
0
)
{
if
(
pVnode
->
sync
>
0
)
{
...
@@ -428,10 +430,7 @@ void vnodeCleanUp(SVnodeObj *pVnode) {
...
@@ -428,10 +430,7 @@ void vnodeCleanUp(SVnodeObj *pVnode) {
syncStop
(
sync
);
syncStop
(
sync
);
}
}
vDebug
(
"vgId:%d, vnode will cleanup, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
vDebug
(
"vgId:%d, vnode is cleaned, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
// release local resources only after cutting off outside connections
qQueryMgmtNotifyClosed
(
pVnode
->
qMgmt
);
vnodeRelease
(
pVnode
);
vnodeRelease
(
pVnode
);
}
}
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
f2320dc7
...
@@ -88,22 +88,15 @@ void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) {
...
@@ -88,22 +88,15 @@ void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) {
vnodeRelease
(
pVnode
);
vnodeRelease
(
pVnode
);
}
}
int32_t
vnodeWriteToRQueue
(
void
*
vparam
,
void
*
pCont
,
int32_t
contLen
,
int8_t
qtype
,
void
*
rparam
)
{
static
SVReadMsg
*
vnodeBuildVReadMsg
(
SVnodeObj
*
pVnode
,
void
*
pCont
,
int32_t
contLen
,
int8_t
qtype
,
SRpcMsg
*
pRpcMsg
)
{
SVnodeObj
*
pVnode
=
vparam
;
if
(
qtype
==
TAOS_QTYPE_RPC
||
qtype
==
TAOS_QTYPE_QUERY
)
{
int32_t
code
=
vnodeCheckRead
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
return
code
;
}
int32_t
size
=
sizeof
(
SVReadMsg
)
+
contLen
;
int32_t
size
=
sizeof
(
SVReadMsg
)
+
contLen
;
SVReadMsg
*
pRead
=
taosAllocateQitem
(
size
);
SVReadMsg
*
pRead
=
taosAllocateQitem
(
size
);
if
(
pRead
==
NULL
)
{
if
(
pRead
==
NULL
)
{
return
TSDB_CODE_VND_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_VND_OUT_OF_MEMORY
;
return
NULL
;
}
}
if
(
rparam
!=
NULL
)
{
if
(
pRpcMsg
!=
NULL
)
{
SRpcMsg
*
pRpcMsg
=
rparam
;
pRead
->
rpcHandle
=
pRpcMsg
->
handle
;
pRead
->
rpcHandle
=
pRpcMsg
->
handle
;
pRead
->
rpcAhandle
=
pRpcMsg
->
ahandle
;
pRead
->
rpcAhandle
=
pRpcMsg
->
ahandle
;
pRead
->
msgType
=
pRpcMsg
->
msgType
;
pRead
->
msgType
=
pRpcMsg
->
msgType
;
...
@@ -119,13 +112,35 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
...
@@ -119,13 +112,35 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
pRead
->
qtype
=
qtype
;
pRead
->
qtype
=
qtype
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
return
pRead
;
}
int32_t
vnodeWriteToRQueue
(
void
*
vparam
,
void
*
pCont
,
int32_t
contLen
,
int8_t
qtype
,
void
*
rparam
)
{
SVReadMsg
*
pRead
=
vnodeBuildVReadMsg
(
vparam
,
pCont
,
contLen
,
qtype
,
rparam
);
if
(
pRead
==
NULL
)
{
assert
(
terrno
!=
0
);
return
terrno
;
}
SVnodeObj
*
pVnode
=
vparam
;
int32_t
code
=
vnodeCheckRead
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosFreeQitem
(
pRead
);
vnodeRelease
(
pVnode
);
return
code
;
}
atomic_add_fetch_32
(
&
pVnode
->
queuedRMsg
,
1
);
atomic_add_fetch_32
(
&
pVnode
->
queuedRMsg
,
1
);
if
(
pRead
->
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
pRead
->
msgType
==
TSDB_MSG_TYPE_FETCH
)
{
if
(
pRead
->
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
pRead
->
msgType
==
TSDB_MSG_TYPE_FETCH
)
{
vTrace
(
"vgId:%d, write into vfetch queue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedRMsg
);
vTrace
(
"vgId:%d, write into vfetch queue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedRMsg
);
return
taosWriteQitem
(
pVnode
->
fqueue
,
qtype
,
pRead
);
return
taosWriteQitem
(
pVnode
->
fqueue
,
qtype
,
pRead
);
}
else
{
}
else
{
vTrace
(
"vgId:%d, write into vquery queue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedRMsg
);
vTrace
(
"vgId:%d, write into vquery queue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedRMsg
);
return
taosWriteQitem
(
pVnode
->
qqueue
,
qtype
,
pRead
);
return
taosWriteQitem
(
pVnode
->
qqueue
,
qtype
,
pRead
);
}
}
}
}
...
@@ -420,3 +435,5 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
...
@@ -420,3 +435,5 @@ int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
vTrace
(
"QInfo:%p register qhandle to connect:%p"
,
qhandle
,
handle
);
vTrace
(
"QInfo:%p register qhandle to connect:%p"
,
qhandle
,
handle
);
return
rpcReportProgress
(
handle
,
(
char
*
)
pMsg
,
sizeof
(
SRetrieveTableMsg
));
return
rpcReportProgress
(
handle
,
(
char
*
)
pMsg
,
sizeof
(
SRetrieveTableMsg
));
}
}
void
vnodeWaitReadCompleted
(
void
*
pVnode
)
{}
\ No newline at end of file
src/vnode/src/vnodeStatus.c
浏览文件 @
f2320dc7
...
@@ -15,6 +15,8 @@
...
@@ -15,6 +15,8 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "os.h"
#include "taosmsg.h"
#include "query.h"
#include "vnodeStatus.h"
#include "vnodeStatus.h"
char
*
vnodeStatus
[]
=
{
char
*
vnodeStatus
[]
=
{
...
@@ -44,11 +46,13 @@ bool vnodeSetReadyStatus(SVnodeObj* pVnode) {
...
@@ -44,11 +46,13 @@ bool vnodeSetReadyStatus(SVnodeObj* pVnode) {
vDebug
(
"vgId:%d, cannot set status:ready, old:%s"
,
pVnode
->
vgId
,
vnodeStatus
[
pVnode
->
status
]);
vDebug
(
"vgId:%d, cannot set status:ready, old:%s"
,
pVnode
->
vgId
,
vnodeStatus
[
pVnode
->
status
]);
}
}
qQueryMgmtReOpen
(
pVnode
->
qMgmt
);
pthread_mutex_unlock
(
&
pVnode
->
statusMutex
);
pthread_mutex_unlock
(
&
pVnode
->
statusMutex
);
return
set
;
return
set
;
}
}
bool
vnodeSetClosingStatus
(
SVnodeObj
*
pVnode
)
{
static
bool
vnodeSetClosingStatusImp
(
SVnodeObj
*
pVnode
)
{
bool
set
=
false
;
bool
set
=
false
;
pthread_mutex_lock
(
&
pVnode
->
statusMutex
);
pthread_mutex_lock
(
&
pVnode
->
statusMutex
);
...
@@ -63,6 +67,20 @@ bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
...
@@ -63,6 +67,20 @@ bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
return
set
;
return
set
;
}
}
bool
vnodeSetClosingStatus
(
SVnodeObj
*
pVnode
)
{
if
(
!
vnodeInInitStatus
(
pVnode
))
{
// it may be in updating or reset state, then it shall wait
int32_t
i
=
0
;
while
(
!
vnodeSetClosingStatusImp
(
pVnode
))
{
if
(
++
i
%
1000
==
0
)
{
sched_yield
();
}
}
}
return
true
;
}
bool
vnodeSetUpdatingStatus
(
SVnodeObj
*
pVnode
)
{
bool
vnodeSetUpdatingStatus
(
SVnodeObj
*
pVnode
)
{
bool
set
=
false
;
bool
set
=
false
;
pthread_mutex_lock
(
&
pVnode
->
statusMutex
);
pthread_mutex_lock
(
&
pVnode
->
statusMutex
);
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
f2320dc7
...
@@ -120,12 +120,6 @@ static int32_t vnodeCheckWrite(SVnodeObj *pVnode) {
...
@@ -120,12 +120,6 @@ static int32_t vnodeCheckWrite(SVnodeObj *pVnode) {
return
TSDB_CODE_APP_NOT_READY
;
return
TSDB_CODE_APP_NOT_READY
;
}
}
if
(
vnodeInClosingStatus
(
pVnode
))
{
vDebug
(
"vgId:%d, vnode status is %s, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
vnodeStatus
[
pVnode
->
status
],
pVnode
->
refCount
,
pVnode
);
return
TSDB_CODE_APP_NOT_READY
;
}
if
(
pVnode
->
isFull
)
{
if
(
pVnode
->
isFull
)
{
vDebug
(
"vgId:%d, vnode is full, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
vDebug
(
"vgId:%d, vnode is full, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
return
TSDB_CODE_VND_IS_FULL
;
return
TSDB_CODE_VND_IS_FULL
;
...
@@ -254,6 +248,14 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
...
@@ -254,6 +248,14 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
}
}
}
}
if
(
!
vnodeInReadyStatus
(
pVnode
))
{
vDebug
(
"vgId:%d, vnode status is %s, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
vnodeStatus
[
pVnode
->
status
],
pVnode
->
refCount
,
pVnode
);
taosFreeQitem
(
pWrite
);
vnodeRelease
(
pVnode
);
return
TSDB_CODE_APP_NOT_READY
;
}
int32_t
queued
=
atomic_add_fetch_32
(
&
pVnode
->
queuedWMsg
,
1
);
int32_t
queued
=
atomic_add_fetch_32
(
&
pVnode
->
queuedWMsg
,
1
);
if
(
queued
>
MAX_QUEUED_MSG_NUM
)
{
if
(
queued
>
MAX_QUEUED_MSG_NUM
)
{
int32_t
ms
=
(
queued
/
MAX_QUEUED_MSG_NUM
)
*
10
+
3
;
int32_t
ms
=
(
queued
/
MAX_QUEUED_MSG_NUM
)
*
10
+
3
;
...
@@ -338,3 +340,5 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
...
@@ -338,3 +340,5 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
return
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
return
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
}
}
}
}
void
vnodeWaitWriteCompleted
(
void
*
pVnode
)
{}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录