Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a31d9ece
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a31d9ece
编写于
11月 26, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
11月 26, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4355 from taosdata/feature/sync
[TD-2166]<fix>: add flow control when synchronizatio fails
上级
a5029620
810023ca
变更
22
隐藏空白更改
内联
并排
Showing
22 changed file
with
230 addition
and
30 deletion
+230
-30
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+1
-0
src/common/src/tglobal.c
src/common/src/tglobal.c
+12
-0
src/dnode/src/dnodeMPeer.c
src/dnode/src/dnodeMPeer.c
+1
-1
src/dnode/src/dnodeMRead.c
src/dnode/src/dnodeMRead.c
+1
-1
src/dnode/src/dnodeMWrite.c
src/dnode/src/dnodeMWrite.c
+1
-1
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+1
-1
src/dnode/src/dnodeVWrite.c
src/dnode/src/dnodeVWrite.c
+2
-2
src/inc/taoserror.h
src/inc/taoserror.h
+1
-0
src/inc/tsync.h
src/inc/tsync.h
+3
-3
src/inc/vnode.h
src/inc/vnode.h
+4
-2
src/os/inc/osAlpine.h
src/os/inc/osAlpine.h
+1
-0
src/os/inc/osArm32.h
src/os/inc/osArm32.h
+1
-0
src/os/inc/osArm64.h
src/os/inc/osArm64.h
+1
-0
src/os/inc/osDarwin.h
src/os/inc/osDarwin.h
+1
-0
src/os/inc/osLinux32.h
src/os/inc/osLinux32.h
+1
-0
src/os/inc/osLinux64.h
src/os/inc/osLinux64.h
+1
-0
src/os/inc/osWindows.h
src/os/inc/osWindows.h
+1
-0
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+2
-3
src/vnode/inc/vnodeInt.h
src/vnode/inc/vnodeInt.h
+1
-1
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+6
-8
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+56
-7
tests/script/unique/cluster/flowctrl.sim
tests/script/unique/cluster/flowctrl.sim
+131
-0
未找到文件。
src/common/inc/tglobal.h
浏览文件 @
a31d9ece
...
...
@@ -97,6 +97,7 @@ extern int32_t tsAlternativeRole;
extern
int32_t
tsBalanceInterval
;
extern
int32_t
tsOfflineThreshold
;
extern
int32_t
tsMnodeEqualVnodeNum
;
extern
int32_t
tsFlowCtrl
;
// restful
extern
int32_t
tsEnableHttpModule
;
...
...
src/common/src/tglobal.c
浏览文件 @
a31d9ece
...
...
@@ -133,6 +133,7 @@ int32_t tsAlternativeRole = 0;
int32_t
tsBalanceInterval
=
300
;
// seconds
int32_t
tsOfflineThreshold
=
86400
*
100
;
// seconds 10days
int32_t
tsMnodeEqualVnodeNum
=
4
;
int32_t
tsFlowCtrl
=
1
;
// restful
int32_t
tsEnableHttpModule
=
1
;
...
...
@@ -971,6 +972,17 @@ static void doInitGlobalConfig(void) {
cfg
.
maxValue
=
1000
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
// module configs
cfg
.
option
=
"flowctrl"
;
cfg
.
ptr
=
&
tsFlowCtrl
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
1
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"http"
;
...
...
src/dnode/src/dnodeMPeer.c
浏览文件 @
a31d9ece
...
...
@@ -162,7 +162,7 @@ static void *dnodeProcessMPeerQueue(void *param) {
break
;
}
d
Debug
(
"msg:%s will be processed in mpeer queue"
,
taosMsg
[
pPeerMsg
->
rpcMsg
.
msgType
]);
d
Trace
(
"msg:%s will be processed in mpeer queue"
,
taosMsg
[
pPeerMsg
->
rpcMsg
.
msgType
]);
int32_t
code
=
mnodeProcessPeerReq
(
pPeerMsg
);
dnodeSendRpcMPeerRsp
(
pPeerMsg
,
code
);
}
...
...
src/dnode/src/dnodeMRead.c
浏览文件 @
a31d9ece
...
...
@@ -168,7 +168,7 @@ static void *dnodeProcessMReadQueue(void *param) {
break
;
}
d
Debug
(
"msg:%p, app:%p type:%s will be processed in mread queue"
,
pRead
->
rpcMsg
.
ahandle
,
pRead
,
d
Trace
(
"msg:%p, app:%p type:%s will be processed in mread queue"
,
pRead
->
rpcMsg
.
ahandle
,
pRead
,
taosMsg
[
pRead
->
rpcMsg
.
msgType
]);
int32_t
code
=
mnodeProcessRead
(
pRead
);
dnodeSendRpcMReadRsp
(
pRead
,
code
);
...
...
src/dnode/src/dnodeMWrite.c
浏览文件 @
a31d9ece
...
...
@@ -174,7 +174,7 @@ static void *dnodeProcessMWriteQueue(void *param) {
break
;
}
d
Debug
(
"msg:%p, app:%p type:%s will be processed in mwrite queue"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
d
Trace
(
"msg:%p, app:%p type:%s will be processed in mwrite queue"
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
taosMsg
[
pWrite
->
rpcMsg
.
msgType
]);
int32_t
code
=
mnodeProcessWrite
(
pWrite
);
...
...
src/dnode/src/dnodeVRead.c
浏览文件 @
a31d9ece
...
...
@@ -175,7 +175,7 @@ static void *dnodeProcessReadQueue(void *pWorker) {
break
;
}
d
Debug
(
"msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d"
,
pRead
,
pRead
->
rpcAhandle
,
d
Trace
(
"msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d"
,
pRead
,
pRead
->
rpcAhandle
,
taosMsg
[
pRead
->
msgType
],
qtype
);
int32_t
code
=
vnodeProcessRead
(
pVnode
,
pRead
);
...
...
src/dnode/src/dnodeVWrite.c
浏览文件 @
a31d9ece
...
...
@@ -176,7 +176,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if
(
count
<=
1
)
return
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pWrite
->
rpc
H
andle
,
.
handle
=
pWrite
->
rpc
Msg
.
h
andle
,
.
pCont
=
pWrite
->
rspRet
.
rsp
,
.
contLen
=
pWrite
->
rspRet
.
len
,
.
code
=
pWrite
->
code
,
...
...
@@ -206,7 +206,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
pWorker
->
qall
,
&
qtype
,
(
void
**
)
&
pWrite
);
dTrace
(
"msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%"
PRIu64
,
pWrite
,
pWrite
->
rpc
A
handle
,
taosMsg
[
pWrite
->
pHead
->
msgType
],
qtypeStr
[
qtype
],
pWrite
->
pHead
->
version
);
pWrite
->
rpc
Msg
.
a
handle
,
taosMsg
[
pWrite
->
pHead
->
msgType
],
qtypeStr
[
qtype
],
pWrite
->
pHead
->
version
);
pWrite
->
code
=
vnodeProcessWrite
(
pVnode
,
pWrite
->
pHead
,
qtype
,
&
pWrite
->
rspRet
);
if
(
pWrite
->
code
<=
0
)
pWrite
->
processedCount
=
1
;
...
...
src/inc/taoserror.h
浏览文件 @
a31d9ece
...
...
@@ -205,6 +205,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid ve
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_FULL
,
0
,
0x050B
,
"Vnode memory is full because commit failed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NOT_SYNCED
,
0
,
0x0511
,
"Database suspended"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_NO_WRITE_AUTH
,
0
,
0x0512
,
"Write operation denied"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_SYNCING
,
0
,
0x0513
,
"Database is syncing"
)
// tsdb
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_INVALID_TABLE_ID
,
0
,
0x0600
,
"Invalid table ID"
)
...
...
src/inc/tsync.h
浏览文件 @
a31d9ece
...
...
@@ -23,7 +23,7 @@ extern "C" {
#define TAOS_SYNC_MAX_REPLICA 5
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
typedef
enum
_TAOS_SYNC_ROLE
{
typedef
enum
{
TAOS_SYNC_ROLE_OFFLINE
=
0
,
TAOS_SYNC_ROLE_UNSYNCED
=
1
,
TAOS_SYNC_ROLE_SYNCING
=
2
,
...
...
@@ -31,7 +31,7 @@ typedef enum _TAOS_SYNC_ROLE {
TAOS_SYNC_ROLE_MASTER
=
4
}
ESyncRole
;
typedef
enum
_TAOS_SYNC_STATUS
{
typedef
enum
{
TAOS_SYNC_STATUS_INIT
=
0
,
TAOS_SYNC_STATUS_START
=
1
,
TAOS_SYNC_STATUS_FILE
=
2
,
...
...
@@ -80,7 +80,7 @@ typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code);
typedef
void
(
*
FNotifyRole
)(
int32_t
vgId
,
int8_t
role
);
// if a number of retrieving data failed, call this to start flow control
typedef
void
(
*
FNotifyFlowCtrl
)(
int32_t
vgId
,
int32_t
mseconds
);
typedef
void
(
*
FNotifyFlowCtrl
)(
int32_t
vgId
,
int32_t
level
);
// when data file is synced successfully, notity app
typedef
int32_t
(
*
FNotifyFileSynced
)(
int32_t
vgId
,
uint64_t
fversion
);
...
...
src/inc/vnode.h
浏览文件 @
a31d9ece
...
...
@@ -20,6 +20,7 @@
extern
"C"
{
#endif
#include "trpc.h"
#include "twal.h"
typedef
enum
_VN_STATUS
{
...
...
@@ -51,8 +52,9 @@ typedef struct {
typedef
struct
{
int32_t
code
;
int32_t
processedCount
;
void
*
rpcHandle
;
void
*
rpcAhandle
;
int32_t
qtype
;
void
*
pVnode
;
SRpcMsg
rpcMsg
;
SRspRet
rspRet
;
char
reserveForSync
[
16
];
SWalHead
pHead
[];
...
...
src/os/inc/osAlpine.h
浏览文件 @
a31d9ece
...
...
@@ -77,6 +77,7 @@ extern "C" {
#include <sys/utsname.h>
#include <sys/resource.h>
#include <linux/sysctl.h>
#include <math.h>
typedef
int
(
*
__compar_fn_t
)(
const
void
*
,
const
void
*
);
void
error
(
int
,
int
,
const
char
*
);
...
...
src/os/inc/osArm32.h
浏览文件 @
a31d9ece
...
...
@@ -76,6 +76,7 @@ extern "C" {
#include <sys/utsname.h>
#include <sys/resource.h>
#include <error.h>
#include <math.h>
#define TAOS_OS_FUNC_LZ4
#define BUILDIN_CLZL(val) __builtin_clzll(val)
...
...
src/os/inc/osArm64.h
浏览文件 @
a31d9ece
...
...
@@ -77,6 +77,7 @@ extern "C" {
#include <sys/resource.h>
#include <error.h>
#include <linux/sysctl.h>
#include <math.h>
#ifdef __cplusplus
}
...
...
src/os/inc/osDarwin.h
浏览文件 @
a31d9ece
...
...
@@ -70,6 +70,7 @@ extern "C" {
#include <dispatch/dispatch.h>
#include <fcntl.h>
#include <sys/utsname.h>
#include <math.h>
#define TAOS_OS_FUNC_FILE_SENDIFLE
...
...
src/os/inc/osLinux32.h
浏览文件 @
a31d9ece
...
...
@@ -76,6 +76,7 @@ extern "C" {
#include <sys/utsname.h>
#include <sys/resource.h>
#include <error.h>
#include <math.h>
#define TAOS_OS_FUNC_LZ4
#define BUILDIN_CLZL(val) __builtin_clzll(val)
...
...
src/os/inc/osLinux64.h
浏览文件 @
a31d9ece
...
...
@@ -79,6 +79,7 @@ extern "C" {
#include <error.h>
#endif
#include <linux/sysctl.h>
#include <math.h>
#ifdef __cplusplus
}
...
...
src/os/inc/osWindows.h
浏览文件 @
a31d9ece
...
...
@@ -40,6 +40,7 @@
#include <time.h>
#include <inttypes.h>
#include <conio.h>
#include <math.h>
#include "msvcProcess.h"
#include "msvcDirect.h"
#include "msvcFcntl.h"
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
a31d9ece
...
...
@@ -504,6 +504,8 @@ void *syncRetrieveData(void *param) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
taosBlockSIGPIPE
();
if
(
pNode
->
notifyFlowCtrl
)
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
vgId
,
pPeer
->
numOfRetrieves
);
pPeer
->
fileChanged
=
0
;
pPeer
->
syncFd
=
taosOpenTcpClientSocket
(
pPeer
->
ip
,
pPeer
->
port
,
0
);
if
(
pPeer
->
syncFd
<
0
)
{
...
...
@@ -520,10 +522,7 @@ void *syncRetrieveData(void *param) {
}
if
(
pPeer
->
fileChanged
)
{
// if file is changed 3 times continuously, start flow control
pPeer
->
numOfRetrieves
++
;
if
(
pPeer
->
numOfRetrieves
>=
2
&&
pNode
->
notifyFlowCtrl
)
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
vgId
,
4
<<
(
pPeer
->
numOfRetrieves
-
2
));
}
else
{
pPeer
->
numOfRetrieves
=
0
;
if
(
pNode
->
notifyFlowCtrl
)
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
vgId
,
0
);
...
...
src/vnode/inc/vnodeInt.h
浏览文件 @
a31d9ece
...
...
@@ -39,7 +39,7 @@ typedef struct {
int32_t
refCount
;
// reference count
int32_t
queuedWMsg
;
int32_t
queuedRMsg
;
int32_t
delayMs
;
int32_t
flowctrlLevel
;
int8_t
status
;
int8_t
role
;
int8_t
accessState
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
a31d9ece
...
...
@@ -34,7 +34,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
static
uint32_t
vnodeGetFileInfo
(
int32_t
vgId
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
);
static
int32_t
vnodeGetWalInfo
(
int32_t
vgId
,
char
*
fileName
,
int64_t
*
fileId
);
static
void
vnodeNotifyRole
(
int32_t
vgId
,
int8_t
role
);
static
void
vnodeCtrlFlow
(
int32_t
vgId
,
int32_t
mseconds
);
static
void
vnodeCtrlFlow
(
int32_t
vgId
,
int32_t
level
);
static
int32_t
vnodeNotifyFileSynced
(
int32_t
vgId
,
uint64_t
fversion
);
static
void
vnodeConfirmForard
(
int32_t
vgId
,
void
*
wparam
,
int32_t
code
);
static
int32_t
vnodeWriteToCache
(
int32_t
vgId
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
);
...
...
@@ -660,7 +660,7 @@ static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
static
void
vnodeNotifyRole
(
int32_t
vgId
,
int8_t
role
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
v
Error
(
"vgId:%d, vnode not found while notify role"
,
vgId
);
v
Trace
(
"vgId:%d, vnode not found while notify role"
,
vgId
);
return
;
}
...
...
@@ -677,17 +677,15 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) {
vnodeRelease
(
pVnode
);
}
static
void
vnodeCtrlFlow
(
int32_t
vgId
,
int32_t
mseconds
)
{
static
void
vnodeCtrlFlow
(
int32_t
vgId
,
int32_t
level
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
{
v
Error
(
"vgId:%d, vnode not found while ctrl flow
"
,
vgId
);
v
Trace
(
"vgId:%d, vnode not found while flow ctrl
"
,
vgId
);
return
;
}
if
(
pVnode
->
delayMs
!=
mseconds
)
{
pVnode
->
delayMs
=
mseconds
;
vDebug
(
"vgId:%d, sync flow control, mseconds:%d"
,
pVnode
->
vgId
,
mseconds
);
}
pVnode
->
flowctrlLevel
=
level
;
vDebug
(
"vgId:%d, set flowctrl level:%d"
,
pVnode
->
vgId
,
level
);
vnodeRelease
(
pVnode
);
}
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
a31d9ece
...
...
@@ -17,19 +17,23 @@
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tglobal.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "tsync.h"
#include "ttimer.h"
#include "tdataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "syncInt.h"
#include "tcq.h"
#include "dnode.h"
#define MAX_QUEUED_MSG_NUM 10000
extern
void
*
tsDnodeTmr
;
static
int32_t
(
*
vnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SVnodeObj
*
,
void
*
pCont
,
SRspRet
*
);
static
int32_t
vnodeProcessSubmitMsg
(
SVnodeObj
*
pVnode
,
void
*
pCont
,
SRspRet
*
);
static
int32_t
vnodeProcessCreateTableMsg
(
SVnodeObj
*
pVnode
,
void
*
pCont
,
SRspRet
*
);
...
...
@@ -37,6 +41,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
static
int32_t
vnodeProcessAlterTableMsg
(
SVnodeObj
*
pVnode
,
void
*
pCont
,
SRspRet
*
);
static
int32_t
vnodeProcessDropStableMsg
(
SVnodeObj
*
pVnode
,
void
*
pCont
,
SRspRet
*
);
static
int32_t
vnodeProcessUpdateTagValMsg
(
SVnodeObj
*
pVnode
,
void
*
pCont
,
SRspRet
*
);
static
int32_t
vnodePerformFlowCtrl
(
SVWriteMsg
*
pWrite
);
void
vnodeInitWriteFp
(
void
)
{
vnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
vnodeProcessSubmitMsg
;
...
...
@@ -77,8 +82,6 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// assign version
pHead
->
version
=
pVnode
->
version
+
1
;
if
(
pVnode
->
delayMs
)
taosMsleep
(
pVnode
->
delayMs
);
}
else
{
// from wal or forward
// for data from WAL or forward, version may be smaller
if
(
pHead
->
version
<=
pVnode
->
version
)
return
0
;
...
...
@@ -218,9 +221,10 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
int32_t
vnodeWriteToWQueue
(
void
*
vparam
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
)
{
SVnodeObj
*
pVnode
=
vparam
;
SWalHead
*
pHead
=
wparam
;
int32_t
code
=
0
;
if
(
qtype
==
TAOS_QTYPE_RPC
)
{
int32_t
code
=
vnodeCheckWrite
(
pVnode
);
code
=
vnodeCheckWrite
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
return
code
;
}
...
...
@@ -237,11 +241,12 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
if
(
rparam
!=
NULL
)
{
SRpcMsg
*
pRpcMsg
=
rparam
;
pWrite
->
rpcHandle
=
pRpcMsg
->
handle
;
pWrite
->
rpcAhandle
=
pRpcMsg
->
ahandle
;
pWrite
->
rpcMsg
=
*
pRpcMsg
;
}
memcpy
(
pWrite
->
pHead
,
pHead
,
sizeof
(
SWalHead
)
+
pHead
->
len
);
pWrite
->
pVnode
=
pVnode
;
pWrite
->
qtype
=
qtype
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
...
...
@@ -251,6 +256,9 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
taosMsleep
(
1
);
}
code
=
vnodePerformFlowCtrl
(
pWrite
);
if
(
code
!=
0
)
return
0
;
vTrace
(
"vgId:%d, write into vwqueue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedWMsg
);
taosWriteQitem
(
pVnode
->
wqueue
,
qtype
,
pWrite
);
...
...
@@ -260,9 +268,50 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
void
vnodeFreeFromWQueue
(
void
*
vparam
,
SVWriteMsg
*
pWrite
)
{
SVnodeObj
*
pVnode
=
vparam
;
atomic_sub_fetch_32
(
&
pVnode
->
queuedWMsg
,
1
);
vTrace
(
"vgId:%d,
free from vwqueue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedWMsg
);
int32_t
queued
=
atomic_sub_fetch_32
(
&
pVnode
->
queuedWMsg
,
1
);
vTrace
(
"vgId:%d,
msg:%p, app:%p, free from vwqueue, queued:%d"
,
pVnode
->
vgId
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
queued
);
taosFreeQitem
(
pWrite
);
vnodeRelease
(
pVnode
);
}
static
void
vnodeFlowCtrlMsgToWQueue
(
void
*
param
,
void
*
tmrId
)
{
SVWriteMsg
*
pWrite
=
param
;
SVnodeObj
*
pVnode
=
pWrite
->
pVnode
;
int32_t
code
=
TSDB_CODE_VND_SYNCING
;
pWrite
->
processedCount
++
;
if
(
pWrite
->
processedCount
>
100
)
{
vError
(
"vgId:%d, msg:%p, failed to process since %s"
,
pVnode
->
vgId
,
pWrite
,
tstrerror
(
code
));
pWrite
->
processedCount
=
1
;
dnodeSendRpcVWriteRsp
(
pWrite
->
pVnode
,
pWrite
,
code
);
}
else
{
code
=
vnodePerformFlowCtrl
(
pWrite
);
if
(
code
==
0
)
{
vTrace
(
"vgId:%d, write into vwqueue after flowctrl"
,
pVnode
->
vgId
);
pWrite
->
processedCount
=
0
;
taosWriteQitem
(
pVnode
->
wqueue
,
pWrite
->
qtype
,
pWrite
);
}
}
}
static
int32_t
vnodePerformFlowCtrl
(
SVWriteMsg
*
pWrite
)
{
SVnodeObj
*
pVnode
=
pWrite
->
pVnode
;
if
(
pVnode
->
flowctrlLevel
<=
0
)
return
0
;
if
(
pWrite
->
qtype
!=
TAOS_QTYPE_RPC
)
return
0
;
if
(
tsFlowCtrl
==
0
)
{
int32_t
ms
=
pow
(
2
,
pVnode
->
flowctrlLevel
+
2
);
if
(
ms
>
100
)
ms
=
100
;
vTrace
(
"vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms"
,
pVnode
->
vgId
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
ms
);
taosMsleep
(
ms
);
return
0
;
}
else
{
void
*
unUsed
=
NULL
;
taosTmrReset
(
vnodeFlowCtrlMsgToWQueue
,
100
,
pWrite
,
tsDnodeTmr
,
&
unUsed
);
vTrace
(
"vgId:%d, msg:%p, app:%p, perform flowctrl, count:%d"
,
pVnode
->
vgId
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
pWrite
->
processedCount
);
return
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
}
}
tests/script/unique/cluster/flowctrl.sim
0 → 100644
浏览文件 @
a31d9ece
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c http -v 0
system sh/cfg.sh -n dnode2 -c http -v 0
system sh/cfg.sh -n dnode3 -c http -v 0
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode1 -c replica -v 3
system sh/cfg.sh -n dnode2 -c replica -v 3
system sh/cfg.sh -n dnode3 -c replica -v 3
print ============== deploy
system sh/exec.sh -n dnode1 -s start
sleep 5001
sql connect
sql create dnode $hostname2
sql create dnode $hostname3
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
print =============== step1
$x = 0
show1:
$x = $x + 1
sleep 2000
if $x == 5 then
return -1
endi
sql show mnodes -x show1
$mnode1Role = $data2_1
print mnode1Role $mnode1Role
$mnode2Role = $data2_2
print mnode2Role $mnode2Role
$mnode3Role = $data2_3
print mnode3Role $mnode3Role
if $mnode1Role != master then
goto show1
endi
if $mnode2Role != slave then
goto show1
endi
if $mnode3Role != slave then
goto show1
endi
print =============== step2
sql create database db replica 3
sql use db
sql create table tb (ts timestamp, test int)
$x = 0
while $x < 100
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
endw
print =============== step3
sleep 3000
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
print =============== step4
sleep 5000
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
print =============== step5
sleep 8000
while $x < 200
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
endw
print =============== step6
system sh/exec.sh -n dnode2 -s stop -x SIGINT
sleep 3000
while $x < 300
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
endw
system sh/exec.sh -n dnode2 -s start
sleep 6000
print =============== step7
while $x < 400
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
sleep 1
endw
print =============== step8
sql select * from tb
print rows $rows
if $rows != 400 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录