提交 5e18e07a 编写于 作者: S Shengliang Guan

refactor: pre process vnode msg

上级 25c52f20
...@@ -238,9 +238,9 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { ...@@ -238,9 +238,9 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
} }
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg); pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
......
...@@ -52,15 +52,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); ...@@ -52,15 +52,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodeStart(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode); void vnodeStop(SVnode *pVnode);
int64_t vnodeGetSyncHandle(SVnode *pVnode); int64_t vnodeGetSyncHandle(SVnode *pVnode);
...@@ -69,10 +61,20 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); ...@@ -69,10 +61,20 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever); int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader); int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData); int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
// meta // meta
typedef struct SMeta SMeta; // todo: remove typedef struct SMeta SMeta; // todo: remove
......
...@@ -94,6 +94,8 @@ int32_t vnodeAsyncCommit(SVnode* pVnode); ...@@ -94,6 +94,8 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
void vnodeSyncStart(SVnode* pVnode); void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,10 +25,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -25,10 +25,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp); static int32_t vnodeProcessWriteReq(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
SDecoder dc = {0}; SDecoder dc = {0};
...@@ -100,7 +100,7 @@ int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -100,7 +100,7 @@ int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) {
return code; return code;
} }
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL; void *ptr = NULL;
void *pReq; void *pReq;
int32_t len; int32_t len;
...@@ -146,7 +146,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -146,7 +146,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
break; break;
case TDMT_VND_DELETE: case TDMT_VND_DELETE:
if (vnodeProcessWriteMsg(pVnode, version, pMsg, pRsp) < 0) goto _err; if (vnodeProcessWriteReq(pVnode, version, pMsg, pRsp) < 0) goto _err;
break; break;
/* TQ */ /* TQ */
case TDMT_VND_MQ_VG_CHANGE: case TDMT_VND_MQ_VG_CHANGE:
...@@ -225,6 +225,11 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -225,6 +225,11 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
if (!vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_SCH_QUERY: case TDMT_SCH_QUERY:
...@@ -240,6 +245,11 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -240,6 +245,11 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
if (!vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
...@@ -280,7 +290,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -280,7 +290,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
} }
} }
int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) { int32_t vnodeProcessWriteReq(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) {
vTrace("message in write queue is processing"); vTrace("message in write queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
......
...@@ -120,7 +120,24 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -120,7 +120,24 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
return code; return code;
} }
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SEpSet newEpSet = {0};
syncGetRetryEpSet(pVnode->sync, &newEpSet);
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
newEpSet.numOfEps, newEpSet.inUse);
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
newEpSet.eps[i].port);
}
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet);
}
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle; SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
int32_t code = 0; int32_t code = 0;
...@@ -131,7 +148,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -131,7 +148,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
code = vnodePreProcessReq(pVnode, pMsg); code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) { if (code != 0) {
vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
} else { } else {
...@@ -141,7 +158,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -141,7 +158,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType)); code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
if (code > 0) { if (code > 0) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno; rsp.code = terrno;
vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr()); vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
} }
...@@ -156,16 +173,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -156,16 +173,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType); vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code < 0) { } else if (code < 0) {
if (terrno == TSDB_CODE_SYN_NOT_LEADER) { if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
SEpSet newEpSet = {0}; vnodeRedirectRpcMsg(pVnode, pMsg);
syncGetRetryEpSet(pVnode->sync, &newEpSet);
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
newEpSet.inUse);
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
}
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet);
} else { } else {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code); vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
...@@ -185,7 +193,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -185,7 +193,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
vnodeWaitBlockMsg(pVnode); vnodeWaitBlockMsg(pVnode);
} }
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle; SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
int32_t code = 0; int32_t code = 0;
...@@ -199,7 +207,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -199,7 +207,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) { if (rsp.code == 0) {
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno; rsp.code = terrno;
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr()); vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
} }
...@@ -500,3 +508,17 @@ void vnodeSyncStart(SVnode *pVnode) { ...@@ -500,3 +508,17 @@ void vnodeSyncStart(SVnode *pVnode) {
} }
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) {
return false;
}
// todo
// if (!pVnode->restored) {
// terrno = TSDB_CODE_APP_NOT_READY;
// return false;
// }
return true;
}
\ No newline at end of file
...@@ -616,8 +616,6 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { ...@@ -616,8 +616,6 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
} }
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
taosReleaseRef(tsNodeRefId, rid); taosReleaseRef(tsNodeRefId, rid);
...@@ -625,8 +623,8 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { ...@@ -625,8 +623,8 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return -1; return -1;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret; return ret;
} }
...@@ -637,15 +635,14 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_ ...@@ -637,15 +635,14 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_
return -1; return -1;
} }
int32_t ret = 0;
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret; return ret;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册