提交 1157bca4 编写于 作者: S Shengliang Guan

enh: adjust vnode replica

上级 739807fd
......@@ -91,51 +91,52 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
}
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
int32_t code = 0;
SRpcMsg *pMsg = NULL;
SVnodeObj *pVnode = pInfo->ahandle;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
return;
}
int64_t sync = vnodeGetSyncHandle(pVnode->pImpl);
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg **));
for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
for (int32_t m = 0; m < numOfMsgs; m++) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dTrace("vgId:%d, get msg:%p from vnode-write queue, type:%s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
dTrace("msg:%p, get from vnode-write queue", pMsg);
if (taosArrayPush(pArray, &pMsg) == NULL) {
dTrace("msg:%p, failed to push to array since %s", pMsg, terrstr());
dError("vgId:%d, failed to push msg:%p to vnode-write array", pVnode->vgId, pMsg);
vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
}
}
for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
SRpcMsg rsp = {.info = pMsg->info};
for (int32_t m = 0; m < taosArrayGetSize(pArray); m++) {
pMsg = *(SRpcMsg **)taosArrayGet(pArray, m);
code = vnodePreprocessReq(pVnode->pImpl, pMsg);
vnodePreprocessReq(pVnode->pImpl, pMsg);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) continue;
if (code != 0) {
dError("vgId:%d, msg:%p failed to process since %s", pVnode->vgId, pMsg, tstrerror(code));
vmSendRsp(pMsg, code);
continue;
}
int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId);
rsp.code = TSDB_CODE_RPC_REDIRECT;
SEpSet newEpSet;
syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
code = syncPropose(sync, pMsg, false);
if (code == TAOS_SYNC_PROPOSE_SUCCESS) {
continue;
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
dTrace("vgId:%d, msg:%p is redirect since not leader", pVnode->vgId, pMsg);
SEpSet newEpSet = {0};
syncGetEpSet(sync, &newEpSet);
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet);
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
tmsgSendRsp(&rsp);
} else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) {
// send response in applyQ
} else {
assert(0);
dError("vgId:%d, msg:%p failed to process since %s", pVnode->vgId, pMsg, tstrerror(code));
vmSendRsp(pMsg, code);
}
}
for (int32_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
dTrace("msg:%p, is freed", pMsg);
pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
dTrace("vgId:%d, msg:%p, is freed", pVnode->vgId, pMsg);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......
......@@ -84,7 +84,7 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeSyncAlter(SVnode* pVnode, SRpcMsg* pMsg);
int32_t vnodeSyncAlter(SVnode* pVnode, SRpcMsg* pMsg);
#ifdef __cplusplus
}
......
......@@ -25,6 +25,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
SDecoder dc = {0};
switch (pMsg->msgType) {
......@@ -89,13 +90,13 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
} break;
case TDMT_VND_ALTER_REPLICA: {
vnodeSyncAlter(pVnode, pMsg);
code = vnodeSyncAlter(pVnode, pMsg);
} break;
default:
break;
}
return 0;
return code;
}
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
......
......@@ -50,13 +50,11 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return 0;
}
void vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
SAlterVnodeReq req = {0};
if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
vError("vgId:%d, failed to alter replica since %s", TD_VID(pVnode), terrstr());
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
tmsgSendRsp(&rsp);
return TSDB_CODE_INVALID_MSG;
}
vInfo("vgId:%d, start to alter vnode replica to %d", TD_VID(pVnode), req.replica);
......@@ -68,11 +66,15 @@ void vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
}
if (syncReconfig(pVnode->sync, &cfg) != 0) {
vError("vgId:%d, failed to propose sync reconfig since %s", TD_VID(pVnode), terrstr());
int32_t code = syncReconfig(pVnode->sync, &cfg);
if (code == TAOS_SYNC_PROPOSE_SUCCESS) {
// todo refactor
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
tmsgSendRsp(&rsp);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
return code;
}
void vnodeSyncStart(SVnode *pVnode) {
......
......@@ -12,11 +12,11 @@ sql connect
print =============== step1: create dnodes
sql create dnode $hostname port 7200
$loop_cnt = 0
$x = 0
step1:
$loop_cnt = $loop_cnt + 1
$ = $x + 1
sleep 1000
if $loop_cnt == 10 then
if $x == 10 then
print ====> dnode not ready!
return -1
endi
......@@ -73,11 +73,11 @@ print =============== step3: create dnodes
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$loop_cnt = 0
$x = 0
step3:
$loop_cnt = $loop_cnt + 1
$x = $x + 1
sleep 1000
if $loop_cnt == 10 then
if $x == 10 then
print ====> dnode not ready!
return -1
endi
......@@ -118,7 +118,7 @@ if $rows != 1 then
return -1
endi
return
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册