diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 4087f638a95b063fcf6081db2556758643ef1c9a..efe3d7678a39f140b413192adfe9654b3661251f 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -97,6 +97,7 @@ extern int32_t tsAlternativeRole; extern int32_t tsBalanceInterval; extern int32_t tsOfflineThreshold; extern int32_t tsMnodeEqualVnodeNum; +extern int32_t tsFlowCtrl; // restful extern int32_t tsEnableHttpModule; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 4495c3d9288a5df0b8944aa444625c143b5c7670..f8bb965d282f994c35d32fabeb4c3ddba2fb4c27 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -133,6 +133,7 @@ int32_t tsAlternativeRole = 0; int32_t tsBalanceInterval = 300; // seconds int32_t tsOfflineThreshold = 86400*100; // seconds 10days int32_t tsMnodeEqualVnodeNum = 4; +int32_t tsFlowCtrl = 1; // restful int32_t tsEnableHttpModule = 1; @@ -971,6 +972,17 @@ static void doInitGlobalConfig(void) { cfg.maxValue = 1000; cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + // module configs + cfg.option = "flowctrl"; + cfg.ptr = &tsFlowCtrl; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); cfg.option = "http"; diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index 05b37bd3388c22046d2a0dd1827655c97c2e40ad..ee6dc5212e4fdb98d4bfb7c3a7678fb6bd81bc8c 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -162,7 +162,7 @@ static void *dnodeProcessMPeerQueue(void *param) { break; } - dDebug("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]); + dTrace("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]); int32_t code = mnodeProcessPeerReq(pPeerMsg); dnodeSendRpcMPeerRsp(pPeerMsg, code); } diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index ee9da72e4d3424ffb97b6cbc90098edd1abf3a6d..65f3af7b3bf13efe0ffa80ca5752d66cc4a43e9e 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -168,7 +168,7 @@ static void *dnodeProcessMReadQueue(void *param) { break; } - dDebug("msg:%p, app:%p type:%s will be processed in mread queue", pRead->rpcMsg.ahandle, pRead, + dTrace("msg:%p, app:%p type:%s will be processed in mread queue", pRead->rpcMsg.ahandle, pRead, taosMsg[pRead->rpcMsg.msgType]); int32_t code = mnodeProcessRead(pRead); dnodeSendRpcMReadRsp(pRead, code); diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 65c0d5381969f291d40245bc31dc4acdf849f5ac..ef2d49ef4233eb94f3efd82f77a6918b2b671ffd 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -174,7 +174,7 @@ static void *dnodeProcessMWriteQueue(void *param) { break; } - dDebug("msg:%p, app:%p type:%s will be processed in mwrite queue", pWrite, pWrite->rpcMsg.ahandle, + dTrace("msg:%p, app:%p type:%s will be processed in mwrite queue", pWrite, pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); int32_t code = mnodeProcessWrite(pWrite); diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 4cce54bf59eecbef6a7056e005819d456178279a..b42a627a3a78031be3191d7be5fb1202723dcc57 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -175,7 +175,7 @@ static void *dnodeProcessReadQueue(void *pWorker) { break; } - dDebug("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle, + dTrace("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle, taosMsg[pRead->msgType], qtype); int32_t code = vnodeProcessRead(pVnode, pRead); diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 959eb3c9c532c249a04a67ae2d8b34992e9bcdc6..6d4b50ee542b565669f0a4782142a3919c19ebe8 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -176,7 +176,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { if (count <= 1) return; SRpcMsg rpcRsp = { - .handle = pWrite->rpcHandle, + .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rspRet.rsp, .contLen = pWrite->rspRet.len, .code = pWrite->code, @@ -206,7 +206,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite, - pWrite->rpcAhandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version); + pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); if (pWrite->code <= 0) pWrite->processedCount = 1; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index f1e736130c5f036e59b39e65f4533c5b473e294e..77ec5350ba0205ebff94e8b740eb90b458cf12e6 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -205,6 +205,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid ve TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_SYNCING, 0, 0x0513, "Database is syncing") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID") diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 77a3b36e73ec6a151e1f8b23e26673d3113dc012..398e1bf97c739406d1f8f568c998d891191b71bf 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -23,7 +23,7 @@ extern "C" { #define TAOS_SYNC_MAX_REPLICA 5 #define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF -typedef enum _TAOS_SYNC_ROLE { +typedef enum { TAOS_SYNC_ROLE_OFFLINE = 0, TAOS_SYNC_ROLE_UNSYNCED = 1, TAOS_SYNC_ROLE_SYNCING = 2, @@ -31,7 +31,7 @@ typedef enum _TAOS_SYNC_ROLE { TAOS_SYNC_ROLE_MASTER = 4 } ESyncRole; -typedef enum _TAOS_SYNC_STATUS { +typedef enum { TAOS_SYNC_STATUS_INIT = 0, TAOS_SYNC_STATUS_START = 1, TAOS_SYNC_STATUS_FILE = 2, @@ -80,7 +80,7 @@ typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code); typedef void (*FNotifyRole)(int32_t vgId, int8_t role); // if a number of retrieving data failed, call this to start flow control -typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds); +typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level); // when data file is synced successfully, notity app typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 4e8389498b5ced87f4ff07dddac817614f5af368..5f643295d6e3d70999de55da47470ad0f0c0df93 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -20,6 +20,7 @@ extern "C" { #endif +#include "trpc.h" #include "twal.h" typedef enum _VN_STATUS { @@ -51,8 +52,9 @@ typedef struct { typedef struct { int32_t code; int32_t processedCount; - void * rpcHandle; - void * rpcAhandle; + int32_t qtype; + void * pVnode; + SRpcMsg rpcMsg; SRspRet rspRet; char reserveForSync[16]; SWalHead pHead[]; diff --git a/src/os/inc/osAlpine.h b/src/os/inc/osAlpine.h index d939adfb6dafb5251f936187225857660842e54e..eba94593955e32fc354500758bd2af53060eaa7a 100644 --- a/src/os/inc/osAlpine.h +++ b/src/os/inc/osAlpine.h @@ -77,6 +77,7 @@ extern "C" { #include #include #include +#include typedef int(*__compar_fn_t)(const void *, const void *); void error (int, int, const char *); diff --git a/src/os/inc/osArm32.h b/src/os/inc/osArm32.h index 17b4d2dbd53fbc516900bf9aa2db69e7f44dace0..24ff95522ebc7ca2b2c23e9961371df58aa2fc6d 100644 --- a/src/os/inc/osArm32.h +++ b/src/os/inc/osArm32.h @@ -76,6 +76,7 @@ extern "C" { #include #include #include +#include #define TAOS_OS_FUNC_LZ4 #define BUILDIN_CLZL(val) __builtin_clzll(val) diff --git a/src/os/inc/osArm64.h b/src/os/inc/osArm64.h index 3ae08b45f49c588315ce014e092d452e2fad303e..22f0000e96c5083556a3751e57f5bdb18d686332 100644 --- a/src/os/inc/osArm64.h +++ b/src/os/inc/osArm64.h @@ -77,6 +77,7 @@ extern "C" { #include #include #include +#include #ifdef __cplusplus } diff --git a/src/os/inc/osDarwin.h b/src/os/inc/osDarwin.h index 7bb844831e03dd1e21abd389a61976e14927ebbd..1461ec6d3b177c300e667d93043308d93ea67d0f 100644 --- a/src/os/inc/osDarwin.h +++ b/src/os/inc/osDarwin.h @@ -70,6 +70,7 @@ extern "C" { #include #include #include +#include #define TAOS_OS_FUNC_FILE_SENDIFLE diff --git a/src/os/inc/osLinux32.h b/src/os/inc/osLinux32.h index 93e917e797a60c9704726ea44d585d33d4ec102a..cfef05368fe76f14604a91f1551b850a79898a4b 100644 --- a/src/os/inc/osLinux32.h +++ b/src/os/inc/osLinux32.h @@ -76,6 +76,7 @@ extern "C" { #include #include #include +#include #define TAOS_OS_FUNC_LZ4 #define BUILDIN_CLZL(val) __builtin_clzll(val) diff --git a/src/os/inc/osLinux64.h b/src/os/inc/osLinux64.h index c0841c41bde7ab848bcae2b9cbd037675407afbf..a2febd51b7b769439b681aaf1871fe514b5558f3 100644 --- a/src/os/inc/osLinux64.h +++ b/src/os/inc/osLinux64.h @@ -79,6 +79,7 @@ extern "C" { #include #endif #include +#include #ifdef __cplusplus } diff --git a/src/os/inc/osWindows.h b/src/os/inc/osWindows.h index 5003e48c44a75011a0006a923a6e53d3d7639195..5a1e6425728b88a5461ce3fe83a060e23161ae86 100644 --- a/src/os/inc/osWindows.h +++ b/src/os/inc/osWindows.h @@ -40,6 +40,7 @@ #include #include #include +#include #include "msvcProcess.h" #include "msvcDirect.h" #include "msvcFcntl.h" diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 348f91820b5be6d35a8fbff1ce32ba5bd2bb1689..060badba9dc188c2741d280d600c709ee2740099 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -504,6 +504,8 @@ void *syncRetrieveData(void *param) { SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); + if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves); + pPeer->fileChanged = 0; pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (pPeer->syncFd < 0) { @@ -520,10 +522,7 @@ void *syncRetrieveData(void *param) { } if (pPeer->fileChanged) { - // if file is changed 3 times continuously, start flow control pPeer->numOfRetrieves++; - if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) - (*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2)); } else { pPeer->numOfRetrieves = 0; if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 021831a644e1da3d1de882e14852d1fa5e58bc1d..7fc9b100eff50bd6a68a1dd5f1f48acbb290c530 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -39,7 +39,7 @@ typedef struct { int32_t refCount; // reference count int32_t queuedWMsg; int32_t queuedRMsg; - int32_t delayMs; + int32_t flowctrlLevel; int8_t status; int8_t role; int8_t accessState; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index b94fea52bd4f8fbdd20677bf874684123f03a9d4..cd6d2ea7c0a4842207140d2e4cab1d30722998ad 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -34,7 +34,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); static void vnodeNotifyRole(int32_t vgId, int8_t role); -static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds); +static void vnodeCtrlFlow(int32_t vgId, int32_t level); static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); @@ -660,7 +660,7 @@ static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { static void vnodeNotifyRole(int32_t vgId, int8_t role) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { - vError("vgId:%d, vnode not found while notify role", vgId); + vTrace("vgId:%d, vnode not found while notify role", vgId); return; } @@ -677,17 +677,15 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) { vnodeRelease(pVnode); } -static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) { +static void vnodeCtrlFlow(int32_t vgId, int32_t level) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { - vError("vgId:%d, vnode not found while ctrl flow", vgId); + vTrace("vgId:%d, vnode not found while flow ctrl", vgId); return; } - if (pVnode->delayMs != mseconds) { - pVnode->delayMs = mseconds; - vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); - } + pVnode->flowctrlLevel = level; + vDebug("vgId:%d, set flowctrl level:%d", pVnode->vgId, level); vnodeRelease(pVnode); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index a0227d84babef8ee4259c7d86121770660097020..57bd407cd1f47ab875c57326f361f80103ecc2b6 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -17,19 +17,23 @@ #include "os.h" #include "taosmsg.h" #include "taoserror.h" +#include "tglobal.h" #include "tqueue.h" #include "trpc.h" #include "tsdb.h" #include "twal.h" #include "tsync.h" +#include "ttimer.h" #include "tdataformat.h" #include "vnode.h" #include "vnodeInt.h" #include "syncInt.h" #include "tcq.h" +#include "dnode.h" #define MAX_QUEUED_MSG_NUM 10000 +extern void * tsDnodeTmr; static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); @@ -37,6 +41,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); +static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite); void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; @@ -77,8 +82,6 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara // assign version pHead->version = pVnode->version + 1; - if (pVnode->delayMs) taosMsleep(pVnode->delayMs); - } else { // from wal or forward // for data from WAL or forward, version may be smaller if (pHead->version <= pVnode->version) return 0; @@ -218,9 +221,10 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { SVnodeObj *pVnode = vparam; SWalHead * pHead = wparam; + int32_t code = 0; if (qtype == TAOS_QTYPE_RPC) { - int32_t code = vnodeCheckWrite(pVnode); + code = vnodeCheckWrite(pVnode); if (code != TSDB_CODE_SUCCESS) return code; } @@ -237,11 +241,12 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar if (rparam != NULL) { SRpcMsg *pRpcMsg = rparam; - pWrite->rpcHandle = pRpcMsg->handle; - pWrite->rpcAhandle = pRpcMsg->ahandle; + pWrite->rpcMsg = *pRpcMsg; } memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len); + pWrite->pVnode = pVnode; + pWrite->qtype = qtype; atomic_add_fetch_32(&pVnode->refCount, 1); @@ -251,6 +256,9 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar taosMsleep(1); } + code = vnodePerformFlowCtrl(pWrite); + if (code != 0) return 0; + vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); taosWriteQitem(pVnode->wqueue, qtype, pWrite); @@ -260,9 +268,50 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { SVnodeObj *pVnode = vparam; - atomic_sub_fetch_32(&pVnode->queuedWMsg, 1); - vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); + int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1); + vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued); taosFreeQitem(pWrite); vnodeRelease(pVnode); } + +static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { + SVWriteMsg *pWrite = param; + SVnodeObj * pVnode = pWrite->pVnode; + int32_t code = TSDB_CODE_VND_SYNCING; + + pWrite->processedCount++; + if (pWrite->processedCount > 100) { + vError("vgId:%d, msg:%p, failed to process since %s", pVnode->vgId, pWrite, tstrerror(code)); + pWrite->processedCount = 1; + dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); + } else { + code = vnodePerformFlowCtrl(pWrite); + if (code == 0) { + vTrace("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId); + pWrite->processedCount = 0; + taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); + } + } +} + +static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { + SVnodeObj *pVnode = pWrite->pVnode; + if (pVnode->flowctrlLevel <= 0) return 0; + if (pWrite->qtype != TAOS_QTYPE_RPC) return 0; + + if (tsFlowCtrl == 0) { + int32_t ms = pow(2, pVnode->flowctrlLevel + 2); + if (ms > 100) ms = 100; + vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms); + taosMsleep(ms); + return 0; + } else { + void *unUsed = NULL; + taosTmrReset(vnodeFlowCtrlMsgToWQueue, 100, pWrite, tsDnodeTmr, &unUsed); + + vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, count:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, + pWrite->processedCount); + return TSDB_CODE_VND_ACTION_IN_PROGRESS; + } +} diff --git a/tests/script/unique/cluster/flowctrl.sim b/tests/script/unique/cluster/flowctrl.sim new file mode 100644 index 0000000000000000000000000000000000000000..6dc60d9fba0fd0a5f7a8088d8277c2da9e06a259 --- /dev/null +++ b/tests/script/unique/cluster/flowctrl.sim @@ -0,0 +1,131 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3 +system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3 + +system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4 + +system sh/cfg.sh -n dnode1 -c http -v 0 +system sh/cfg.sh -n dnode2 -c http -v 0 +system sh/cfg.sh -n dnode3 -c http -v 0 + +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000 +system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000 +system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000 + +system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20 +system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20 +system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20 + +system sh/cfg.sh -n dnode1 -c replica -v 3 +system sh/cfg.sh -n dnode2 -c replica -v 3 +system sh/cfg.sh -n dnode3 -c replica -v 3 + +print ============== deploy + +system sh/exec.sh -n dnode1 -s start +sleep 5001 +sql connect + +sql create dnode $hostname2 +sql create dnode $hostname3 +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start + +print =============== step1 +$x = 0 +show1: + $x = $x + 1 + sleep 2000 + if $x == 5 then + return -1 + endi +sql show mnodes -x show1 +$mnode1Role = $data2_1 +print mnode1Role $mnode1Role +$mnode2Role = $data2_2 +print mnode2Role $mnode2Role +$mnode3Role = $data2_3 +print mnode3Role $mnode3Role + +if $mnode1Role != master then + goto show1 +endi +if $mnode2Role != slave then + goto show1 +endi +if $mnode3Role != slave then + goto show1 +endi + +print =============== step2 + +sql create database db replica 3 +sql use db +sql create table tb (ts timestamp, test int) + +$x = 0 +while $x < 100 + $ms = $x . s + sql insert into tb values (now + $ms , $x ) + $x = $x + 1 +endw + +print =============== step3 +sleep 3000 + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT + +print =============== step4 +sleep 5000 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start + +print =============== step5 +sleep 8000 +while $x < 200 + $ms = $x . s + sql insert into tb values (now + $ms , $x ) + $x = $x + 1 +endw + +print =============== step6 +system sh/exec.sh -n dnode2 -s stop -x SIGINT +sleep 3000 +while $x < 300 + $ms = $x . s + sql insert into tb values (now + $ms , $x ) + $x = $x + 1 +endw + +system sh/exec.sh -n dnode2 -s start + +sleep 6000 +print =============== step7 +while $x < 400 + $ms = $x . s + sql insert into tb values (now + $ms , $x ) + $x = $x + 1 + sleep 1 +endw + +print =============== step8 +sql select * from tb +print rows $rows +if $rows != 400 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT \ No newline at end of file