diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index d3e97517388da3a83defa14324c55b9929bb9c61..7552428a1467793740c3722f9fdc2fb95d6cbea6 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -405,9 +405,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { static void vnodeNotifyFileSynced(void *ahandle) { SVnodeObj *pVnode = ahandle; - vTrace("pVnode:%p vgId:%d, data file is synced", pVnode, pVnode->vgId); + vTrace("vgId:%d, data file is synced", pVnode->vgId); - // clsoe tsdb, then open tsdb + // close tsdb, then open tsdb } static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index d2dd1811cf61cf3c5f30964942d428cebf570a77..03407a5aa217306c5480a50f30fe09f0aceec538 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -65,7 +65,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; - vTrace("vgId:%d QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); + vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); } else { pQInfo = pCont; code = TSDB_CODE_ACTION_IN_PROGRESS; @@ -83,7 +83,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont int32_t code = TSDB_CODE_SUCCESS; - vTrace("vgId:%d QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); + vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); pRet->code = qRetrieveQueryResultInfo(pQInfo); if (pRet->code != TSDB_CODE_SUCCESS) { @@ -104,6 +104,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont } } - vTrace("vgId:%d QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo); + vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo); return code; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 3fc079ca13d711509679634f23e7327cc5d37cc9..ec0a3b2f0b55e544aefd26ffa6588a1015023c68 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -51,10 +51,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) return TSDB_CODE_MSG_NOT_PROCESSED; - if (pVnode->status != TAOS_VN_STATUS_READY && qtype == TAOS_QTYPE_RPC) - return TSDB_CODE_NOT_ACTIVE_VNODE; - - if (pHead->version == 0) { // from client + if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) return TSDB_CODE_NOT_ACTIVE_VNODE; @@ -64,12 +61,10 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { // assign version pVnode->version++; pHead->version = pVnode->version; - } else { + } else { // from wal or forward // for data from WAL or forward, version may be smaller if (pHead->version <= pVnode->version) return 0; } - - // more status and role checking here pVnode->version = pHead->version; @@ -77,9 +72,13 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { code = walWrite(pVnode->wal, pHead); if (code < 0) return code; - int32_t syncCode = syncForwardToPeer(pVnode->sync, pHead, item); + // forward to peers if data is from RPC or CQ + int32_t syncCode = 0; + if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_CQ) + syncCode = syncForwardToPeer(pVnode->sync, pHead, item); if (syncCode < 0) return syncCode; + // write data locally code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); if (code < 0) return code;