提交 16f557a7 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

version in sync is not updated correctly

上级 a2f08cfb
...@@ -95,7 +95,7 @@ typedef void* tsync_h; ...@@ -95,7 +95,7 @@ typedef void* tsync_h;
tsync_h syncStart(const SSyncInfo *); tsync_h syncStart(const SSyncInfo *);
void syncStop(tsync_h shandle); void syncStop(tsync_h shandle);
int syncReconfig(tsync_h shandle, const SSyncCfg *); int syncReconfig(tsync_h shandle, const SSyncCfg *);
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle); int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype);
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code);
void syncRecover(tsync_h shandle); // recover from other nodes: void syncRecover(tsync_h shandle); // recover from other nodes:
int syncGetNodesRole(tsync_h shandle, SNodesRole *); int syncGetNodesRole(tsync_h shandle, SNodesRole *);
......
...@@ -227,7 +227,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { ...@@ -227,7 +227,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
static int32_t sdbForwardToPeer(SWalHead *pHead) { static int32_t sdbForwardToPeer(SWalHead *pHead) {
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version); int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC);
if (code > 0) { if (code > 0) {
sdbTrace("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code); sdbTrace("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code);
sem_wait(&tsSdbObj.sem); sem_wait(&tsSdbObj.sem);
......
...@@ -46,7 +46,7 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; ...@@ -46,7 +46,7 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
#ifndef _SYNC #ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; } tsync_h syncStart(const SSyncInfo *info) { return NULL; }
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle) { return 0; } int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
void syncStop(tsync_h shandle) {} void syncStop(tsync_h shandle) {}
int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
......
...@@ -72,10 +72,9 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -72,10 +72,9 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if (code < 0) return code; if (code < 0) return code;
// forward to peers if data is from RPC or CQ // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_CQ) syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
syncCode = syncForwardToPeer(pVnode->sync, pHead, item);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write data locally // write data locally
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册