提交 810023ca 编写于 作者: S Shengliang Guan

TD-1455 TD-2196

上级 06029954
......@@ -205,6 +205,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid ve
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_SYNCING, 0, 0x0513, "Database is syncing")
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "Invalid table ID")
......
......@@ -250,15 +250,15 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
atomic_add_fetch_32(&pVnode->refCount, 1);
code = vnodePerformFlowCtrl(pWrite);
if (code != 0) return code;
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) {
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued);
taosMsleep(1);
}
code = vnodePerformFlowCtrl(pWrite);
if (code != 0) return 0;
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
taosWriteQitem(pVnode->wqueue, qtype, pWrite);
......@@ -268,40 +268,50 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam;
atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
}
static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) {
static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
SVWriteMsg *pWrite = param;
SVnodeObj * pVnode = pWrite->pVnode;
int32_t code = TSDB_CODE_VND_SYNCING;
int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, pWrite->rpcMsg.handle == NULL ? NULL : &pWrite->rpcMsg);
if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) {
vDebug("vgId:%d, failed to reprocess msg after perform flowctrl since %s", pVnode->vgId, tstrerror(code));
pWrite->processedCount++;
if (pWrite->processedCount > 100) {
vError("vgId:%d, msg:%p, failed to process since %s", pVnode->vgId, pWrite, tstrerror(code));
pWrite->processedCount = 1;
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
} else {
code = vnodePerformFlowCtrl(pWrite);
if (code == 0) {
vTrace("vgId:%d, write into vwqueue after flowctrl", pVnode->vgId);
pWrite->processedCount = 0;
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
}
}
tfree(pWrite);
vnodeRelease(pWrite->pVnode);
}
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
SVnodeObj *pVnode = pWrite->pVnode;
if (pVnode->flowctlLevel <= 0) return 0;
if (pVnode->flowctrlLevel <= 0) return 0;
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
if (tsFlowCtrl == 0) {
int32_t ms = pVnode->flowctlLevel * 2;
if (ms > 60000) ms = 60000;
vDebug("vgId:%d, perform flowctrl for %d ms", pVnode->vgId, ms);
int32_t ms = pow(2, pVnode->flowctrlLevel + 2);
if (ms > 100) ms = 100;
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms);
taosMsleep(ms);
return 0;
} else {
void *unUsed = NULL;
taosTmrReset(vnodeFlowCtlMsgToWQueue, 5, pWrite, tsDnodeTmr, &unUsed);
return TSDB_CODE_RPC_ACTION_IN_PROGRESS;
taosTmrReset(vnodeFlowCtrlMsgToWQueue, 100, pWrite, tsDnodeTmr, &unUsed);
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl, count:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle,
pWrite->processedCount);
return TSDB_CODE_VND_ACTION_IN_PROGRESS;
}
}
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c http -v 0
system sh/cfg.sh -n dnode2 -c http -v 0
system sh/cfg.sh -n dnode3 -c http -v 0
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode1 -c replica -v 3
system sh/cfg.sh -n dnode2 -c replica -v 3
system sh/cfg.sh -n dnode3 -c replica -v 3
print ============== deploy
system sh/exec.sh -n dnode1 -s start
sleep 5001
sql connect
sql create dnode $hostname2
sql create dnode $hostname3
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
print =============== step1
$x = 0
show1:
$x = $x + 1
sleep 2000
if $x == 5 then
return -1
endi
sql show mnodes -x show1
$mnode1Role = $data2_1
print mnode1Role $mnode1Role
$mnode2Role = $data2_2
print mnode2Role $mnode2Role
$mnode3Role = $data2_3
print mnode3Role $mnode3Role
if $mnode1Role != master then
goto show1
endi
if $mnode2Role != slave then
goto show1
endi
if $mnode3Role != slave then
goto show1
endi
print =============== step2
sql create database db replica 3
sql use db
sql create table tb (ts timestamp, test int)
$x = 0
while $x < 100
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
endw
print =============== step3
sleep 3000
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
print =============== step4
sleep 5000
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
print =============== step5
sleep 8000
while $x < 200
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
endw
print =============== step6
system sh/exec.sh -n dnode2 -s stop -x SIGINT
sleep 3000
while $x < 300
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
endw
system sh/exec.sh -n dnode2 -s start
sleep 6000
print =============== step7
while $x < 400
$ms = $x . s
sql insert into tb values (now + $ms , $x )
$x = $x + 1
sleep 1
endw
print =============== step8
sql select * from tb
print rows $rows
if $rows != 400 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册