提交 a13f9acf 编写于 作者: S Shengliang Guan

TD-1652

上级 8edc164c
...@@ -244,6 +244,9 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -244,6 +244,9 @@ static void *dnodeProcessWriteQueue(void *param) {
} }
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet); int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
dTrace("%p, wal msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead,
taosMsg[pHead->msgType], pHead->version, tstrerror(code));
if (pWrite) { if (pWrite) {
pWrite->rpcMsg.code = code; pWrite->rpcMsg.code = code;
if (code <= 0) pWrite->processedCount = 1; if (code <= 0) pWrite->processedCount = 1;
......
...@@ -247,6 +247,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x080B, "CPU cores ...@@ -247,6 +247,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x080B, "CPU cores
// sync // sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid sync version")
// wal // wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
......
...@@ -341,18 +341,19 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { ...@@ -341,18 +341,19 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
return TSDB_CODE_SYN_INVALID_VERSION;
} else { } else {
// always update version // always update version
nodeVersion = pWalHead->version; nodeVersion = pWalHead->version;
sDebug("vgId:%d, update version, replica:%d nodeRole:%d qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, sDebug("vgId:%d, update version, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica,
nodeRole, qtype, pWalHead->version); syncRole[nodeRole], qtype, pWalHead->version);
return TSDB_CODE_SUCCESS;
} }
return 0;
} }
nodeVersion = pWalHead->version; nodeVersion = pWalHead->version;
sDebug("vgId:%d, forward will send, replica:%d nodeRole:%d qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, sDebug("vgId:%d, forward will send, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica,
nodeRole, qtype, pWalHead->version); syncRole[nodeRole], qtype, pWalHead->version);
// only pkt from RPC or CQ can be forwarded // only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
......
...@@ -59,8 +59,9 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -59,8 +59,9 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
// TODO: Later, let slave to support query // TODO: Later, let slave to support query
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { // if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType],
pVnode->syncCfg.replica, syncRole[pVnode->role]);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
......
...@@ -71,7 +71,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -71,7 +71,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
} }
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType],
pVnode->syncCfg.replica, syncRole[pVnode->role]);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
...@@ -85,18 +86,24 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -85,18 +86,24 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
if (pHead->version <= pVnode->version) return 0; if (pHead->version <= pVnode->version) return 0;
} }
int64_t lastVersion = pVnode->version;
pVnode->version = pHead->version; pVnode->version = pHead->version;
// write into WAL // write into WAL
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if (code < 0) return code; if (code < 0) return code;
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
if (syncCode < 0) return syncCode; if (syncCode < 0) {
pVnode->version = lastVersion;
vError("vgId:%d, msgType:%s not processed, reason:%s, wal ver:%" PRIu64 " restore to last ver:%" PRIu64,
pVnode->vgId, taosMsg[pHead->msgType], tstrerror(syncCode), pHead->version, pVnode->version);
return syncCode;
}
// write data locally // write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
if (code < 0) return code; if (code < 0) return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册