提交 0ab1a0ff 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/TD-1413

...@@ -97,6 +97,7 @@ extern int32_t tsAlternativeRole; ...@@ -97,6 +97,7 @@ extern int32_t tsAlternativeRole;
extern int32_t tsBalanceInterval; extern int32_t tsBalanceInterval;
extern int32_t tsOfflineThreshold; extern int32_t tsOfflineThreshold;
extern int32_t tsMnodeEqualVnodeNum; extern int32_t tsMnodeEqualVnodeNum;
extern int32_t tsFlowCtrl;
// restful // restful
extern int32_t tsEnableHttpModule; extern int32_t tsEnableHttpModule;
......
...@@ -133,6 +133,7 @@ int32_t tsAlternativeRole = 0; ...@@ -133,6 +133,7 @@ int32_t tsAlternativeRole = 0;
int32_t tsBalanceInterval = 300; // seconds int32_t tsBalanceInterval = 300; // seconds
int32_t tsOfflineThreshold = 86400*100; // seconds 10days int32_t tsOfflineThreshold = 86400*100; // seconds 10days
int32_t tsMnodeEqualVnodeNum = 4; int32_t tsMnodeEqualVnodeNum = 4;
int32_t tsFlowCtrl = 1;
// restful // restful
int32_t tsEnableHttpModule = 1; int32_t tsEnableHttpModule = 1;
...@@ -1011,6 +1012,17 @@ static void doInitGlobalConfig(void) { ...@@ -1011,6 +1012,17 @@ static void doInitGlobalConfig(void) {
cfg.maxValue = 1000; cfg.maxValue = 1000;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// module configs
cfg.option = "flowctrl";
cfg.ptr = &tsFlowCtrl;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "http"; cfg.option = "http";
......
package com.taosdata.jdbc.cases;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class AppMemoryLeakTest {
@Test(expected = SQLException.class)
public void testCreateTooManyConnection() throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
int conCnt = 0;
while (true) {
Connection conn = DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata");
System.out.println(conCnt++ + " : " + conn);
}
}
@Test
public void testCreateTooManyStatement() throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
int stmtCnt = 0;
Connection conn = DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata");
while (true) {
Statement stmt = conn.createStatement();
System.out.println(++stmtCnt + " : " + stmt);
}
}
public static void main(String[] args) throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
int stmtCnt = 0;
Connection conn = DriverManager.getConnection("jdbc:TAOS://localhost:6030/?user=root&password=taosdata");
while (true) {
Statement stmt = conn.createStatement();
System.out.println(++stmtCnt + " : " + stmt);
}
}
}
...@@ -162,7 +162,7 @@ static void *dnodeProcessMPeerQueue(void *param) { ...@@ -162,7 +162,7 @@ static void *dnodeProcessMPeerQueue(void *param) {
break; break;
} }
dDebug("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]); dTrace("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]);
int32_t code = mnodeProcessPeerReq(pPeerMsg); int32_t code = mnodeProcessPeerReq(pPeerMsg);
dnodeSendRpcMPeerRsp(pPeerMsg, code); dnodeSendRpcMPeerRsp(pPeerMsg, code);
} }
......
...@@ -168,7 +168,7 @@ static void *dnodeProcessMReadQueue(void *param) { ...@@ -168,7 +168,7 @@ static void *dnodeProcessMReadQueue(void *param) {
break; break;
} }
dDebug("msg:%p, app:%p type:%s will be processed in mread queue", pRead->rpcMsg.ahandle, pRead, dTrace("msg:%p, app:%p type:%s will be processed in mread queue", pRead->rpcMsg.ahandle, pRead,
taosMsg[pRead->rpcMsg.msgType]); taosMsg[pRead->rpcMsg.msgType]);
int32_t code = mnodeProcessRead(pRead); int32_t code = mnodeProcessRead(pRead);
dnodeSendRpcMReadRsp(pRead, code); dnodeSendRpcMReadRsp(pRead, code);
......
...@@ -174,7 +174,7 @@ static void *dnodeProcessMWriteQueue(void *param) { ...@@ -174,7 +174,7 @@ static void *dnodeProcessMWriteQueue(void *param) {
break; break;
} }
dDebug("msg:%p, app:%p type:%s will be processed in mwrite queue", pWrite, pWrite->rpcMsg.ahandle, dTrace("msg:%p, app:%p type:%s will be processed in mwrite queue", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType]); taosMsg[pWrite->rpcMsg.msgType]);
int32_t code = mnodeProcessWrite(pWrite); int32_t code = mnodeProcessWrite(pWrite);
......
...@@ -151,9 +151,9 @@ void dnodeCleanupClient() { ...@@ -151,9 +151,9 @@ void dnodeCleanupClient() {
} }
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() == TSDB_RUN_STATUS_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return; if (pMsg == NULL || pMsg->pCont == NULL) return;
dDebug("msg:%p is ignored since dnode not running", pMsg); dDebug("msg:%p is ignored since dnode is stopping", pMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
return; return;
} }
......
...@@ -175,7 +175,7 @@ static void *dnodeProcessReadQueue(void *pWorker) { ...@@ -175,7 +175,7 @@ static void *dnodeProcessReadQueue(void *pWorker) {
break; break;
} }
dDebug("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle, dTrace("msg:%p, app:%p type:%s will be processed in vread queue, qtype:%d", pRead, pRead->rpcAhandle,
taosMsg[pRead->msgType], qtype); taosMsg[pRead->msgType], qtype);
int32_t code = vnodeProcessRead(pVnode, pRead); int32_t code = vnodeProcessRead(pVnode, pRead);
......
...@@ -176,7 +176,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { ...@@ -176,7 +176,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (count <= 1) return; if (count <= 1) return;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pWrite->rpcHandle, .handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp, .pCont = pWrite->rspRet.rsp,
.contLen = pWrite->rspRet.len, .contLen = pWrite->rspRet.len,
.code = pWrite->code, .code = pWrite->code,
...@@ -206,7 +206,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -206,7 +206,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite, dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
pWrite->rpcAhandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version); pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version);
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
if (pWrite->code <= 0) pWrite->processedCount = 1; if (pWrite->code <= 0) pWrite->processedCount = 1;
......
...@@ -205,6 +205,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid ve ...@@ -205,6 +205,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid ve
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_SYNCING, 0, 0x0513, "Database is syncing")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID")
......
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#define TAOS_SYNC_MAX_REPLICA 5 #define TAOS_SYNC_MAX_REPLICA 5
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF #define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
typedef enum _TAOS_SYNC_ROLE { typedef enum {
TAOS_SYNC_ROLE_OFFLINE = 0, TAOS_SYNC_ROLE_OFFLINE = 0,
TAOS_SYNC_ROLE_UNSYNCED = 1, TAOS_SYNC_ROLE_UNSYNCED = 1,
TAOS_SYNC_ROLE_SYNCING = 2, TAOS_SYNC_ROLE_SYNCING = 2,
...@@ -31,7 +31,7 @@ typedef enum _TAOS_SYNC_ROLE { ...@@ -31,7 +31,7 @@ typedef enum _TAOS_SYNC_ROLE {
TAOS_SYNC_ROLE_MASTER = 4 TAOS_SYNC_ROLE_MASTER = 4
} ESyncRole; } ESyncRole;
typedef enum _TAOS_SYNC_STATUS { typedef enum {
TAOS_SYNC_STATUS_INIT = 0, TAOS_SYNC_STATUS_INIT = 0,
TAOS_SYNC_STATUS_START = 1, TAOS_SYNC_STATUS_START = 1,
TAOS_SYNC_STATUS_FILE = 2, TAOS_SYNC_STATUS_FILE = 2,
...@@ -80,7 +80,7 @@ typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code); ...@@ -80,7 +80,7 @@ typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code);
typedef void (*FNotifyRole)(int32_t vgId, int8_t role); typedef void (*FNotifyRole)(int32_t vgId, int8_t role);
// if a number of retrieving data failed, call this to start flow control // if a number of retrieving data failed, call this to start flow control
typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds); typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
// when data file is synced successfully, notity app // when data file is synced successfully, notity app
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "trpc.h"
#include "twal.h" #include "twal.h"
typedef enum _VN_STATUS { typedef enum _VN_STATUS {
...@@ -51,8 +52,9 @@ typedef struct { ...@@ -51,8 +52,9 @@ typedef struct {
typedef struct { typedef struct {
int32_t code; int32_t code;
int32_t processedCount; int32_t processedCount;
void * rpcHandle; int32_t qtype;
void * rpcAhandle; void * pVnode;
SRpcMsg rpcMsg;
SRspRet rspRet; SRspRet rspRet;
char reserveForSync[16]; char reserveForSync[16];
SWalHead pHead[]; SWalHead pHead[];
......
...@@ -77,6 +77,7 @@ extern "C" { ...@@ -77,6 +77,7 @@ extern "C" {
#include <sys/utsname.h> #include <sys/utsname.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <linux/sysctl.h> #include <linux/sysctl.h>
#include <math.h>
typedef int(*__compar_fn_t)(const void *, const void *); typedef int(*__compar_fn_t)(const void *, const void *);
void error (int, int, const char *); void error (int, int, const char *);
......
...@@ -76,6 +76,7 @@ extern "C" { ...@@ -76,6 +76,7 @@ extern "C" {
#include <sys/utsname.h> #include <sys/utsname.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <error.h> #include <error.h>
#include <math.h>
#define TAOS_OS_FUNC_LZ4 #define TAOS_OS_FUNC_LZ4
#define BUILDIN_CLZL(val) __builtin_clzll(val) #define BUILDIN_CLZL(val) __builtin_clzll(val)
......
...@@ -77,6 +77,7 @@ extern "C" { ...@@ -77,6 +77,7 @@ extern "C" {
#include <sys/resource.h> #include <sys/resource.h>
#include <error.h> #include <error.h>
#include <linux/sysctl.h> #include <linux/sysctl.h>
#include <math.h>
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -70,6 +70,7 @@ extern "C" { ...@@ -70,6 +70,7 @@ extern "C" {
#include <dispatch/dispatch.h> #include <dispatch/dispatch.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/utsname.h> #include <sys/utsname.h>
#include <math.h>
#define TAOS_OS_FUNC_FILE_SENDIFLE #define TAOS_OS_FUNC_FILE_SENDIFLE
......
...@@ -76,6 +76,7 @@ extern "C" { ...@@ -76,6 +76,7 @@ extern "C" {
#include <sys/utsname.h> #include <sys/utsname.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <error.h> #include <error.h>
#include <math.h>
#define TAOS_OS_FUNC_LZ4 #define TAOS_OS_FUNC_LZ4
#define BUILDIN_CLZL(val) __builtin_clzll(val) #define BUILDIN_CLZL(val) __builtin_clzll(val)
......
...@@ -79,6 +79,7 @@ extern "C" { ...@@ -79,6 +79,7 @@ extern "C" {
#include <error.h> #include <error.h>
#endif #endif
#include <linux/sysctl.h> #include <linux/sysctl.h>
#include <math.h>
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include <time.h> #include <time.h>
#include <inttypes.h> #include <inttypes.h>
#include <conio.h> #include <conio.h>
#include <math.h>
#include "msvcProcess.h" #include "msvcProcess.h"
#include "msvcDirect.h" #include "msvcDirect.h"
#include "msvcFcntl.h" #include "msvcFcntl.h"
......
...@@ -504,6 +504,8 @@ void *syncRetrieveData(void *param) { ...@@ -504,6 +504,8 @@ void *syncRetrieveData(void *param) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);
pPeer->fileChanged = 0; pPeer->fileChanged = 0;
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (pPeer->syncFd < 0) { if (pPeer->syncFd < 0) {
...@@ -520,10 +522,7 @@ void *syncRetrieveData(void *param) { ...@@ -520,10 +522,7 @@ void *syncRetrieveData(void *param) {
} }
if (pPeer->fileChanged) { if (pPeer->fileChanged) {
// if file is changed 3 times continuously, start flow control
pPeer->numOfRetrieves++; pPeer->numOfRetrieves++;
if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl)
(*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2));
} else { } else {
pPeer->numOfRetrieves = 0; pPeer->numOfRetrieves = 0;
if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0); if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
......
...@@ -39,7 +39,7 @@ typedef struct { ...@@ -39,7 +39,7 @@ typedef struct {
int32_t refCount; // reference count int32_t refCount; // reference count
int32_t queuedWMsg; int32_t queuedWMsg;
int32_t queuedRMsg; int32_t queuedRMsg;
int32_t delayMs; int32_t flowctrlLevel;
int8_t status; int8_t status;
int8_t role; int8_t role;
int8_t accessState; int8_t accessState;
......
...@@ -37,7 +37,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); ...@@ -37,7 +37,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
static void vnodeNotifyRole(int32_t vgId, int8_t role); static void vnodeNotifyRole(int32_t vgId, int8_t role);
static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds); static void vnodeCtrlFlow(int32_t vgId, int32_t level);
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
...@@ -650,7 +650,7 @@ static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { ...@@ -650,7 +650,7 @@ static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
static void vnodeNotifyRole(int32_t vgId, int8_t role) { static void vnodeNotifyRole(int32_t vgId, int8_t role) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
vError("vgId:%d, vnode not found while notify role", vgId); vTrace("vgId:%d, vnode not found while notify role", vgId);
return; return;
} }
...@@ -667,17 +667,15 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) { ...@@ -667,17 +667,15 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) { static void vnodeCtrlFlow(int32_t vgId, int32_t level) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
vError("vgId:%d, vnode not found while ctrl flow", vgId); vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
return; return;
} }
if (pVnode->delayMs != mseconds) { pVnode->flowctrlLevel = level;
pVnode->delayMs = mseconds; vDebug("vgId:%d, set flowctrl level:%d", pVnode->vgId, level);
vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
}
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
......
...@@ -17,19 +17,23 @@ ...@@ -17,19 +17,23 @@
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "ttimer.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "syncInt.h" #include "syncInt.h"
#include "tcq.h" #include "tcq.h"
#include "dnode.h"
#define MAX_QUEUED_MSG_NUM 10000 #define MAX_QUEUED_MSG_NUM 10000
extern void * tsDnodeTmr;
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *); static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
...@@ -37,6 +41,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -37,6 +41,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite);
void vnodeInitWriteFp(void) { void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
...@@ -77,8 +82,6 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -77,8 +82,6 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// assign version // assign version
pHead->version = pVnode->version + 1; pHead->version = pVnode->version + 1;
if (pVnode->delayMs) taosMsleep(pVnode->delayMs);
} else { // from wal or forward } else { // from wal or forward
// for data from WAL or forward, version may be smaller // for data from WAL or forward, version may be smaller
if (pHead->version <= pVnode->version) return 0; if (pHead->version <= pVnode->version) return 0;
...@@ -218,9 +221,10 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR ...@@ -218,9 +221,10 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam; SWalHead * pHead = wparam;
int32_t code = 0;
if (qtype == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
int32_t code = vnodeCheckWrite(pVnode); code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) return code;
} }
...@@ -237,11 +241,12 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -237,11 +241,12 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
if (rparam != NULL) { if (rparam != NULL) {
SRpcMsg *pRpcMsg = rparam; SRpcMsg *pRpcMsg = rparam;
pWrite->rpcHandle = pRpcMsg->handle; pWrite->rpcMsg = *pRpcMsg;
pWrite->rpcAhandle = pRpcMsg->ahandle;
} }
memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len); memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
pWrite->pVnode = pVnode;
pWrite->qtype = qtype;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
...@@ -251,6 +256,9 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -251,6 +256,9 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
taosMsleep(1); taosMsleep(1);
} }
code = vnodePerformFlowCtrl(pWrite);
if (code != 0) return 0;
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
taosWriteQitem(pVnode->wqueue, qtype, pWrite); taosWriteQitem(pVnode->wqueue, qtype, pWrite);
...@@ -260,9 +268,50 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -260,9 +268,50 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
atomic_sub_fetch_32(&pVnode->queuedWMsg, 1); int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued);
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
SVWriteMsg *pWrite = param;
SVnodeObj * pVnode = pWrite->pVnode;
int32_t code = TSDB_CODE_VND_SYNCING;
pWrite->processedCount++;
if (pWrite->processedCount > 100) {
vError("vgId:%d, msg:%p, failed to process since %s", pVnode->vgId, pWrite, tstrerror(code));
pWrite->processedCount = 1;
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
} else {
code = vnodePerformFlowCtrl(pWrite);
if (code == 0) {
vTrace("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId);
pWrite->processedCount = 0;
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
}
}
}
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
SVnodeObj *pVnode = pWrite->pVnode;
if (pVnode->flowctrlLevel <= 0) return 0;
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
if (tsFlowCtrl == 0) {
int32_t ms = pow(2, pVnode->flowctrlLevel + 2);
if (ms > 100) ms = 100;
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms);
taosMsleep(ms);
return 0;
} else {
void *unUsed = NULL;
taosTmrReset(vnodeFlowCtrlMsgToWQueue, 100, pWrite, tsDnodeTmr, &unUsed);
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, count:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle,
pWrite->processedCount);
return TSDB_CODE_VND_ACTION_IN_PROGRESS;
}
}
...@@ -146,7 +146,7 @@ class ConcurrentInquiry: ...@@ -146,7 +146,7 @@ class ConcurrentInquiry:
col_list=self.stb_stru_list[tbi-1] col_list=self.stb_stru_list[tbi-1]
tag_list=self.stb_tag_list[tbi-1] tag_list=self.stb_tag_list[tbi-1]
is_stb=1 is_stb=1
tlist=col_list+tag_list tlist=col_list+tag_list+['abc'] #增加不存在的域'abc',是否会引起新bug
con_rand=random.randint(0,len(condition_list)) con_rand=random.randint(0,len(condition_list))
func_rand=random.randint(0,len(func_list)) func_rand=random.randint(0,len(func_list))
col_rand=random.randint(0,len(col_list)) col_rand=random.randint(0,len(col_list))
......
...@@ -18,10 +18,10 @@ import time ...@@ -18,10 +18,10 @@ import time
import argparse import argparse
class RestfulInsert: class RestfulInsert:
def __init__(self, host, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder): def __init__(self, host, startTimestamp, dbname, threads, tables, records, batchSize, tbNamePerfix, outOfOrder):
self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='} self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
self.url = "http://%s:6041/rest/sql" % host self.url = "http://%s:6041/rest/sql" % host
self.ts = 1500000000000 self.ts = startTimestamp
self.dbname = dbname self.dbname = dbname
self.numOfThreads = threads self.numOfThreads = threads
self.numOfTables = tables self.numOfTables = tables
...@@ -36,8 +36,10 @@ class RestfulInsert: ...@@ -36,8 +36,10 @@ class RestfulInsert:
for i in range(tablesPerThread): for i in range(tablesPerThread):
tableID = threadID * tablesPerThread tableID = threadID * tablesPerThread
name = 'beijing' if tableID % 2 == 0 else 'shanghai' name = 'beijing' if tableID % 2 == 0 else 'shanghai'
data = "create table %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name) data = "create table if not exists %s.%s%d using %s.meters tags(%d, '%s')" % (self.dbname, self.tableNamePerfix, tableID + i, self.dbname, tableID + i, name)
requests.post(self.url, data, headers = self.header) response = requests.post(self.url, data, headers = self.header)
if response.status_code != 200:
print(response.content)
def insertData(self, threadID): def insertData(self, threadID):
print("thread %d started" % threadID) print("thread %d started" % threadID)
...@@ -50,7 +52,9 @@ class RestfulInsert: ...@@ -50,7 +52,9 @@ class RestfulInsert:
values = [] values = []
for k in range(self.batchSize): for k in range(self.batchSize):
data += "(%d, %d, %d, %d)" % (start + j * self.batchSize + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100)) data += "(%d, %d, %d, %d)" % (start + j * self.batchSize + k, random.randint(1, 100), random.randint(1, 100), random.randint(1, 100))
requests.post(self.url, data, headers = self.header) response = requests.post(self.url, data, headers = self.header)
if response.status_code != 200:
print(response.content)
def insertUnlimitedData(self, threadID): def insertUnlimitedData(self, threadID):
print("thread %d started" % threadID) print("thread %d started" % threadID)
...@@ -76,15 +80,15 @@ class RestfulInsert: ...@@ -76,15 +80,15 @@ class RestfulInsert:
else: else:
random.shuffle(values) random.shuffle(values)
for k in range(len(values)): for k in range(len(values)):
data += values[k] data += values[k]
requests.post(self.url, data, headers = self.header) response = requests.post(self.url, data, headers = self.header)
if response.status_code != 200:
print(response.content)
def run(self): def run(self):
data = "drop database if exists %s" % self.dbname data = "create database if not exists %s" % self.dbname
requests.post(self.url, data, headers = self.header)
data = "create database %s" % self.dbname
requests.post(self.url, data, headers = self.header) requests.post(self.url, data, headers = self.header)
data = "create table %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname data = "create table if not exists %s.meters(ts timestamp, f1 int, f2 int, f3 int) tags(id int, loc nchar(20))" % self.dbname
requests.post(self.url, data, headers = self.header) requests.post(self.url, data, headers = self.header)
threads = [] threads = []
...@@ -120,6 +124,13 @@ parser.add_argument( ...@@ -120,6 +124,13 @@ parser.add_argument(
default='127.0.0.1', default='127.0.0.1',
type=str, type=str,
help='host name to be connected (default: 127.0.0.1)') help='host name to be connected (default: 127.0.0.1)')
parser.add_argument(
'-S',
'--start-timestamp',
action='store',
default=1500000000000,
type=int,
help='insert data from timestamp (default: 1500000000000)')
parser.add_argument( parser.add_argument(
'-d', '-d',
'--db-name', '--db-name',
...@@ -169,5 +180,5 @@ parser.add_argument( ...@@ -169,5 +180,5 @@ parser.add_argument(
help='The order of test data (default: False)') help='The order of test data (default: False)')
args = parser.parse_args() args = parser.parse_args()
ri = RestfulInsert(args.host_name, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order) ri = RestfulInsert(args.host_name, args.start_timestamp, args.db_name, args.number_of_threads, args.number_of_tables, args.number_of_records, args.batch_size, args.table_name_prefix, args.out_of_order)
ri.run() ri.run()
\ No newline at end of file
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c http -v 0
system sh/cfg.sh -n dnode2 -c http -v 0
system sh/cfg.sh -n dnode3 -c http -v 0
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode1 -c replica -v 3
system sh/cfg.sh -n dnode2 -c replica -v 3
system sh/cfg.sh -n dnode3 -c replica -v 3
print ============== deploy
system sh/exec.sh -n dnode1 -s start
sleep 5001
sql connect
sql create dnode $hostname2
sql create dnode $hostname3
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
print =============== step1
$x = 0
show1:
$x = $x + 1
sleep 2000
if $x == 5 then
return -1
endi
sql show mnodes -x show1
$mnode1Role = $data2_1
print mnode1Role $mnode1Role
$mnode2Role = $data2_2
print mnode2Role $mnode2Role
$mnode3Role = $data2_3
print mnode3Role $mnode3Role
if $mnode1Role != master then
goto show1
endi
if $mnode2Role != slave then
goto show1
endi
if $mnode3Role != slave then
goto show1
endi
print =============== step2
sql create database db replica 3
sql use db
sql create table tb (ts timestamp, test int)
$x = 0
while $x < 100
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
endw
print =============== step3
sleep 3000
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
print =============== step4
sleep 5000
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
print =============== step5
sleep 8000
while $x < 200
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
endw
print =============== step6
system sh/exec.sh -n dnode2 -s stop -x SIGINT
sleep 3000
while $x < 300
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
endw
system sh/exec.sh -n dnode2 -s start
sleep 6000
print =============== step7
while $x < 400
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
sleep 1
endw
print =============== step8
sql select * from tb
print rows $rows
if $rows != 400 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
\ No newline at end of file
...@@ -17,9 +17,9 @@ function runSimCaseOneByOne { ...@@ -17,9 +17,9 @@ function runSimCaseOneByOne {
echo -e "${GREEN}$case success${NC}" | tee -a out.log || \ echo -e "${GREEN}$case success${NC}" | tee -a out.log || \
echo -e "${RED}$case failed${NC}" | tee -a out.log echo -e "${RED}$case failed${NC}" | tee -a out.log
out_log=`tail -1 out.log ` out_log=`tail -1 out.log `
if [[ $out_log =~ 'failed' ]];then # if [[ $out_log =~ 'failed' ]];then
exit 8 # exit 8
fi # fi
end_time=`date +%s` end_time=`date +%s`
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log
fi fi
...@@ -42,9 +42,9 @@ function runPyCaseOneByOne { ...@@ -42,9 +42,9 @@ function runPyCaseOneByOne {
echo -e "${RED}$case failed${NC}" | tee -a pytest-out.log echo -e "${RED}$case failed${NC}" | tee -a pytest-out.log
end_time=`date +%s` end_time=`date +%s`
out_log=`tail -1 pytest-out.log ` out_log=`tail -1 pytest-out.log `
if [[ $out_log =~ 'failed' ]];then # if [[ $out_log =~ 'failed' ]];then
exit 8 # exit 8
fi # fi
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a pytest-out.log echo execution time of $case was `expr $end_time - $start_time`s. | tee -a pytest-out.log
else else
$line > /dev/null 2>&1 $line > /dev/null 2>&1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册