提交 785f25c4 编写于 作者: M Minghao Li

sync integration

上级 9a792404
...@@ -117,6 +117,36 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -117,6 +117,36 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
vnodePreprocessWriteReqs(pVnode->pImpl, pArray); vnodePreprocessWriteReqs(pVnode->pImpl, pArray);
// sync integration response
/*
for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SNodeMsg *pMsg;
SRpcMsg *pRpc;
pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
pRpc = &pMsg->rpcMsg;
// set request version
void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead));
// int64_t ver = pVnode->pImpl->state.processed++; // ???????
int64_t ver;
taosEncodeFixedI64(&pBuf, ver);
int32_t ret = syncPropose(pVnode->pImpl->sync, pRpc, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
// not leader
// send response
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
// send response
} else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) {
// ok
// send response in applyQ
} else {
assert(0);
}
}
*/
numOfMsgs = taosArrayGetSize(pArray); numOfMsgs = taosArrayGetSize(pArray);
for (int32_t i = 0; i < numOfMsgs; i++) { for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
...@@ -154,6 +184,9 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -154,6 +184,9 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
// todo // todo
SRpcMsg *pRsp = NULL; SRpcMsg *pRsp = NULL;
(void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
// sync integration response
// send response
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册