提交 2854d610 编写于 作者: S Shengliang Guan

TD-3308

上级 851975db
...@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else { } else {
if (qtype == TAOS_QTYPE_FWD) { if (qtype == TAOS_QTYPE_FWD) {
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0); vnodeConfirmForward(pVnode, pWrite->pHead.version, 0, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
} }
if (pWrite->rspRet.rsp) { if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp); rpcFreeCont(pWrite->rspRet.rsp);
......
...@@ -112,8 +112,8 @@ void syncCleanUp(); ...@@ -112,8 +112,8 @@ void syncCleanUp();
int64_t syncStart(const SSyncInfo *); int64_t syncStart(const SSyncInfo *);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg *); int32_t syncReconfig(int64_t rid, const SSyncCfg *);
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype); int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype, bool force);
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code); void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force);
void syncRecover(int64_t rid); // recover from other nodes: void syncRecover(int64_t rid); // recover from other nodes:
int32_t syncGetNodesRole(int64_t rid, SNodesRole *); int32_t syncGetNodesRole(int64_t rid, SNodesRole *);
......
...@@ -78,7 +78,7 @@ void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); ...@@ -78,7 +78,7 @@ void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
// vnodeSync // vnodeSync
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force);
// vnodeRead // vnodeRead
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
......
...@@ -680,7 +680,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * ...@@ -680,7 +680,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
if (pRow != NULL) { if (pRow != NULL) {
// forward to peers // forward to peers
pRow->processedCount = 0; pRow->processedCount = 0;
int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC); int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
if (syncCode <= 0) pRow->processedCount = 1; if (syncCode <= 0) pRow->processedCount = 1;
if (syncCode < 0) { if (syncCode < 0) {
...@@ -700,7 +700,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * ...@@ -700,7 +700,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version);
// even it is WAL/FWD, it shall be called to update version in sync // even it is WAL/FWD, it shall be called to update version in sync
syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC); syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false);
// from wal or forward msg, row not created, should add into hash // from wal or forward msg, row not created, should add into hash
if (action == SDB_ACTION_INSERT) { if (action == SDB_ACTION_INSERT) {
...@@ -1119,7 +1119,7 @@ static void *sdbWorkerFp(void *pWorker) { ...@@ -1119,7 +1119,7 @@ static void *sdbWorkerFp(void *pWorker) {
sdbConfirmForward(1, pRow, pRow->code); sdbConfirmForward(1, pRow, pRow->code);
} else { } else {
if (qtype == TAOS_QTYPE_FWD) { if (qtype == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code); syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code, false);
} }
sdbFreeFromQueue(pRow); sdbFreeFromQueue(pRow);
} }
......
...@@ -56,7 +56,7 @@ static void syncMonitorNodeRole(void *param, void *tmrId); ...@@ -56,7 +56,7 @@ static void syncMonitorNodeRole(void *param, void *tmrId);
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void syncRestartPeer(SSyncPeer *pPeer); static void syncRestartPeer(SSyncPeer *pPeer);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp); static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force);
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
static void syncStartCheckPeerConn(SSyncPeer *pPeer); static void syncStartCheckPeerConn(SSyncPeer *pPeer);
...@@ -378,24 +378,24 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -378,24 +378,24 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
return 0; return 0;
} }
int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) { int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype, bool force) {
if (rid <= 0) return 0; if (rid <= 0) return 0;
SSyncNode *pNode = syncAcquireNode(rid); SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return 0; if (pNode == NULL) return 0;
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype, force);
syncReleaseNode(pNode); syncReleaseNode(pNode);
return code; return code;
} }
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force) {
SSyncNode *pNode = syncAcquireNode(rid); SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return; if (pNode == NULL) return;
SSyncPeer *pPeer = pNode->pMaster; SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) { if (pPeer && (pNode->quorum > 1 || force)) {
SFwdRsp rsp; SFwdRsp rsp;
syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code); syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code);
...@@ -1414,7 +1414,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1414,7 +1414,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
syncReleaseNode(pNode); syncReleaseNode(pNode);
} }
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) { static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force) {
SSyncPeer *pPeer; SSyncPeer *pPeer;
SSyncHead *pSyncHead; SSyncHead *pSyncHead;
SWalHead * pWalHead = data; SWalHead * pWalHead = data;
...@@ -1458,7 +1458,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1458,7 +1458,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if (pPeer == NULL || pPeer->peerFd < 0) continue; if (pPeer == NULL || pPeer->peerFd < 0) continue;
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
if (pNode->quorum > 1 && code == 0) { if ((pNode->quorum > 1 || force) && code == 0) {
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle); code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
if (code >= 0) code = 1; if (code >= 0) code = 1;
} }
......
...@@ -32,7 +32,7 @@ int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rpar ...@@ -32,7 +32,7 @@ int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rpar
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
int32_t vnodeResetVersion(int32_t vgId, uint64_t fver); int32_t vnodeResetVersion(int32_t vgId, uint64_t fver);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -174,7 +174,7 @@ int32_t vnodeResetVersion(int32_t vgId, uint64_t fver) { ...@@ -174,7 +174,7 @@ int32_t vnodeResetVersion(int32_t vgId, uint64_t fver) {
return 0; return 0;
} }
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) { void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code); syncConfirmForward(pVnode->sync, version, code, force);
} }
\ No newline at end of file
...@@ -88,7 +88,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -88,7 +88,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype); bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write into WAL // write into WAL
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册