未验证 提交 05c8a42d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1879 from taosdata/hotfix/syncCrash

forward to peer only the data packet is from client or CQ
...@@ -405,9 +405,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -405,9 +405,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
static void vnodeNotifyFileSynced(void *ahandle) { static void vnodeNotifyFileSynced(void *ahandle) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
vTrace("pVnode:%p vgId:%d, data file is synced", pVnode, pVnode->vgId); vTrace("vgId:%d, data file is synced", pVnode->vgId);
// clsoe tsdb, then open tsdb // close tsdb, then open tsdb
} }
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
......
...@@ -65,7 +65,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -65,7 +65,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp; pRet->rsp = pRsp;
vTrace("vgId:%d QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
} else { } else {
pQInfo = pCont; pQInfo = pCont;
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
...@@ -83,7 +83,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -83,7 +83,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
vTrace("vgId:%d QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
pRet->code = qRetrieveQueryResultInfo(pQInfo); pRet->code = qRetrieveQueryResultInfo(pQInfo);
if (pRet->code != TSDB_CODE_SUCCESS) { if (pRet->code != TSDB_CODE_SUCCESS) {
...@@ -104,6 +104,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -104,6 +104,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
} }
} }
vTrace("vgId:%d QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo);
return code; return code;
} }
...@@ -51,10 +51,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -51,10 +51,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED; return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status != TAOS_VN_STATUS_READY && qtype == TAOS_QTYPE_RPC) if (pHead->version == 0) { // from client or CQ
return TSDB_CODE_NOT_ACTIVE_VNODE;
if (pHead->version == 0) { // from client
if (pVnode->status != TAOS_VN_STATUS_READY) if (pVnode->status != TAOS_VN_STATUS_READY)
return TSDB_CODE_NOT_ACTIVE_VNODE; return TSDB_CODE_NOT_ACTIVE_VNODE;
...@@ -64,22 +61,24 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -64,22 +61,24 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
// assign version // assign version
pVnode->version++; pVnode->version++;
pHead->version = pVnode->version; pHead->version = pVnode->version;
} else { } else { // from wal or forward
// for data from WAL or forward, version may be smaller // for data from WAL or forward, version may be smaller
if (pHead->version <= pVnode->version) return 0; if (pHead->version <= pVnode->version) return 0;
} }
// more status and role checking here
pVnode->version = pHead->version; pVnode->version = pHead->version;
// write into WAL // write into WAL
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if (code < 0) return code; if (code < 0) return code;
int32_t syncCode = syncForwardToPeer(pVnode->sync, pHead, item); // forward to peers if data is from RPC or CQ
int32_t syncCode = 0;
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_CQ)
syncCode = syncForwardToPeer(pVnode->sync, pHead, item);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
if (code < 0) return code; if (code < 0) return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册