提交 4da465f6 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/mnode

...@@ -107,7 +107,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf ...@@ -107,7 +107,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL); // no response here int32_t code = vnodeProcessSyncMsg(pVnode->pImpl, pMsg, NULL); // no response here
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
......
...@@ -52,10 +52,10 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); ...@@ -52,10 +52,10 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
......
...@@ -240,7 +240,7 @@ struct SVnode { ...@@ -240,7 +240,7 @@ struct SVnode {
SSink* pSink; SSink* pSink;
tsem_t canCommit; tsem_t canCommit;
int64_t sync; int64_t sync;
int32_t syncCount; int32_t blockCount;
tsem_t syncSem; tsem_t syncSem;
SQHandle* pQuery; SQHandle* pQuery;
}; };
......
...@@ -85,6 +85,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { ...@@ -85,6 +85,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
void tqOffsetClose(STqOffsetStore* pStore) { void tqOffsetClose(STqOffsetStore* pStore) {
tqOffsetSnapshot(pStore); tqOffsetSnapshot(pStore);
taosHashCleanup(pStore->pHash); taosHashCleanup(pStore->pHash);
taosMemoryFree(pStore);
} }
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
......
...@@ -81,7 +81,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -81,7 +81,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->state.applied = info.state.committed; pVnode->state.applied = info.state.committed;
pVnode->pTfs = pTfs; pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
pVnode->syncCount = 0; pVnode->blockCount = 0;
tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&pVnode->syncSem, 0, 0);
tsem_init(&(pVnode->canCommit), 0, 1); tsem_init(&(pVnode->canCommit), 0, 1);
......
...@@ -28,7 +28,7 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo ...@@ -28,7 +28,7 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp); static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
SDecoder dc = {0}; SDecoder dc = {0};
......
...@@ -25,12 +25,12 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } ...@@ -25,12 +25,12 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) { static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) {
if (!vnodeIsMsgBlock(type)) return; if (!vnodeIsMsgBlock(type)) return;
int32_t count = atomic_add_fetch_32(&pVnode->syncCount, 1); int32_t count = atomic_add_fetch_32(&pVnode->blockCount, 1);
vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type)); vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
} }
static inline void vnodeWaitBlockMsg(SVnode *pVnode) { static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
int32_t count = atomic_load_32(&pVnode->syncCount); int32_t count = atomic_load_32(&pVnode->blockCount);
if (count <= 0) return; if (count <= 0) return;
vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count); vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
...@@ -40,10 +40,10 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode) { ...@@ -40,10 +40,10 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) { static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
if (!vnodeIsMsgBlock(type)) return; if (!vnodeIsMsgBlock(type)) return;
int32_t count = atomic_load_32(&pVnode->syncCount); int32_t count = atomic_load_32(&pVnode->blockCount);
if (count <= 0) return; if (count <= 0) return;
count = atomic_sub_fetch_32(&pVnode->syncCount, 1); count = atomic_sub_fetch_32(&pVnode->blockCount, 1);
vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type)); vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
if (count <= 0) { if (count <= 0) {
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
...@@ -84,8 +84,10 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -84,8 +84,10 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
STraceId *trace = &pMsg->info.traceId;
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle); vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex}; SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
for (int32_t r = 0; r < req.replica; ++r) { for (int32_t r = 0; r < req.replica; ++r) {
SNodeInfo *pNode = &cfg.nodeInfo[r]; SNodeInfo *pNode = &cfg.nodeInfo[r];
...@@ -126,32 +128,23 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -126,32 +128,23 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t m = 0; m < numOfMsgs; m++) { for (int32_t m = 0; m < numOfMsgs; m++) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
code = vnodePreProcessReq(pVnode, pMsg);
if (code != 0) {
vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
} else {
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
code = vnodeProcessAlterReplicaReq(pVnode, pMsg); code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
} else {
code = vnodePreprocessReq(pVnode, pMsg);
if (code != 0) {
vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr());
} else { } else {
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType)); code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
if (code == 1) { if (code > 0) {
do {
static int32_t cnt = 0;
if (cnt++ % 1000 == 1) {
vInfo("vgId:%d, msg:%p apply right now, apply index:%ld, msgtype:%s,%d", vgId, pMsg,
pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->msgType);
}
} while (0);
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno; rsp.code = terrno;
vInfo("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr()); vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
} }
if (rsp.info.handle != NULL) { if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
...@@ -161,18 +154,10 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -161,18 +154,10 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
if (code == 0) { if (code == 0) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType); vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { } else if (code < 0) {
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
SEpSet newEpSet = {0}; SEpSet newEpSet = {0};
syncGetRetryEpSet(pVnode->sync, &newEpSet); syncGetRetryEpSet(pVnode->sync, &newEpSet);
/*
syncGetEpSet(pVnode->sync, &newEpSet);
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
}
*/
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps, vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
newEpSet.inUse); newEpSet.inUse);
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) { for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
...@@ -182,13 +167,15 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -182,13 +167,15 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet); tmsgSendRedirectRsp(&rsp, &newEpSet);
} else { } else {
if (code != 1) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code); vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
SRpcMsg rsp = {.code = code, .info = pMsg->info}; SRpcMsg rsp = {.code = code, .info = pMsg->info};
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
} }
} else {
}
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -206,7 +193,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -206,7 +193,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType), vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType),
pMsg->info.handle); pMsg->info.handle);
...@@ -229,17 +216,24 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -229,17 +216,24 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
} }
} }
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = 0; int32_t code = 0;
const STraceId *trace = &pMsg->info.traceId;
if (syncEnvIsStart()) { if (!syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId);
assert(pSyncNode != NULL); terrno = TSDB_CODE_APP_ERROR;
return -1;
}
SMsgHead *pHead = pMsg->pCont; SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
STraceId *trace = &pMsg->info.traceId; if (pSyncNode == NULL) {
vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
do { #if 1
char *syncNodeStr = sync2SimpleStr(pVnode->sync); char *syncNodeStr = sync2SimpleStr(pVnode->sync);
static int64_t vndTick = 0; static int64_t vndTick = 0;
if (++vndTick % 10 == 1) { if (++vndTick % 10 == 1) {
...@@ -247,154 +241,125 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -247,154 +241,125 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
if (gRaftDetailLog) { if (gRaftDetailLog) {
char logBuf[512] = {0}; char logBuf[512] = {0};
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, snprintf(logBuf, sizeof(logBuf), "vnode process syncmsg, msgType:%d, syncNode:%s", pMsg->msgType, syncNodeStr);
syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg); syncRpcMsgLog2(logBuf, pMsg);
} }
taosMemoryFree(syncNodeStr); taosMemoryFree(syncNodeStr);
} while (0); #endif
SRpcMsg *pRpcMsg = pMsg;
// ToDo: ugly! use function pointer
// use different strategy
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg); syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg); syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { code = vnodeSetStandBy(pVnode);
ret = vnodeSetStandBy(pVnode); if (code != 0 && terrno != 0) code = terrno;
if (ret != 0 && terrno != 0) ret = terrno; SRpcMsg rsp = {.code = code, .info = pMsg->info};
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else { } else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
ret = -1; code = -1;
} }
} else { } else {
// use wal first strategy // use wal first strategy
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
syncClientRequestBatchDestroyDeep(pSyncMsg); syncClientRequestBatchDestroyDeep(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg); syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) { SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg);
SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
syncAppendEntriesBatchDestroy(pSyncMsg); syncAppendEntriesBatchDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { code = vnodeSetStandBy(pVnode);
ret = vnodeSetStandBy(pVnode); if (code != 0 && terrno != 0) code = terrno;
if (ret != 0 && terrno != 0) ret = terrno; SRpcMsg rsp = {.code = code, .info = pMsg->info};
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else { } else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
ret = -1; code = -1;
} }
} }
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} else { if (code != 0 && terrno == 0) {
vError("==vnodeProcessSyncReq== error syncEnv stop");
ret = -1;
}
if (ret != 0 && terrno == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} }
return ret; return code;
} }
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
...@@ -427,7 +392,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon ...@@ -427,7 +392,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info); syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index; rpcMsg.info.conn.applyIndex = cbMeta.index;
STraceId *trace = (STraceId *)&pMsg->info.traceId; const STraceId *trace = (STraceId *)&pMsg->info.traceId;
vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle); TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
if (rpcMsg.info.handle != NULL) { if (rpcMsg.info.handle != NULL) {
...@@ -444,9 +409,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c ...@@ -444,9 +409,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", "commitCb execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", pFsm,
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
...@@ -459,16 +423,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c ...@@ -459,16 +423,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf), "preCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm,
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", snprintf(logBuf, sizeof(logBuf), "rollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm,
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
......
...@@ -27,6 +27,10 @@ static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; ...@@ -27,6 +27,10 @@ static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1; int32_t exchangeObjRefPool = -1;
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); } static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
static void cleanupRefPool() {
int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
taosCloseRef(ref);
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) { qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
...@@ -34,7 +38,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, ...@@ -34,7 +38,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool); taosThreadOnce(&initPoolOnce, initRefPool);
atexit(cleanupRefPool);
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model); int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
......
...@@ -1375,6 +1375,7 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { ...@@ -1375,6 +1375,7 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
} }
taosArrayDestroy(pInfo->scanCols); taosArrayDestroy(pInfo->scanCols);
taosMemoryFreeClear(pInfo->pUser);
} }
static int32_t getSysTableDbNameColId(const char* pTable) { static int32_t getSysTableDbNameColId(const char* pTable) {
......
python3 .\test.py -f 0-others\taosShell.py @REM python3 .\test.py -f 0-others\taosShell.py
python3 .\test.py -f 0-others\taosShellError.py python3 .\test.py -f 0-others\taosShellError.py
python3 .\test.py -f 0-others\taosShellNetChk.py python3 .\test.py -f 0-others\taosShellNetChk.py
python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\telemetry.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册