diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 137b97e28797d0d6dbc8654ccfc23f3fa99b7760..05d1d93cf60f7f6d60e2361d2f55746d4a17c007 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -95,7 +95,7 @@ typedef void* tsync_h; tsync_h syncStart(const SSyncInfo *); void syncStop(tsync_h shandle); int syncReconfig(tsync_h shandle, const SSyncCfg *); -int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle); +int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncRecover(tsync_h shandle); // recover from other nodes: int syncGetNodesRole(tsync_h shandle, SNodesRole *); diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 087c84effd63e8735a30cc9e738f20050046a3bf..47fb71680abfd12b02b96184e644b25d8422d6bc 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -227,7 +227,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { static int32_t sdbForwardToPeer(SWalHead *pHead) { if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; - int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version); + int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC); if (code > 0) { sdbTrace("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code); sem_wait(&tsSdbObj.sem); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 6dabc98ae829dddce569df8a19ee022cdeac30ec..4514d80a5423c1c37fb56f3919a969728c8cae81 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -46,7 +46,7 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } -int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle) { return 0; } +int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } void syncStop(tsync_h shandle) {} int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 635c4669782114fa34c420bd9e7d752fc5f24828..9c415d6af756d6c9f015ef50055a00b2c48a910b 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -72,10 +72,9 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { code = walWrite(pVnode->wal, pHead); if (code < 0) return code; - // forward to peers if data is from RPC or CQ + // forward to peers, even it is WAL/FWD, it shall be called to update version in sync int32_t syncCode = 0; - if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_CQ) - syncCode = syncForwardToPeer(pVnode->sync, pHead, item); + syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); if (syncCode < 0) return syncCode; // write data locally