diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index dbd081338acc3470915d03f1682e2f8b8a72bf6e..1381d4c39180866e26f57a5b2462f1b00aa3d063 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -140,6 +140,9 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { if (pSnode == NULL) { dError("snode: msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pMsg, terrstr(), TMSG_INFO(pMsg->msgType), qtype); + taosFreeQitem(pMsg); + rpcFreeCont(pRpc->pCont); + pRpc->pCont = NULL; return -1; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 0f50391ac597a76510b867370b8e235f92e672ba..abb23bfb89d7406c5ebab62eff4ecb9b1f5c9a88 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -282,6 +282,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { pMgmt->errCode = 0; SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; + if (req.contLen <= 0) return -1; + req.pCont = rpcMallocCont(req.contLen); if (req.pCont == NULL) return -1; memcpy(req.pCont, pRaw, req.contLen); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 6a606a1a7ef2e0e758eba3e8f52705315c514f6b..ac05598bdc37b63c5dd80c82dfd1becc58452fc9 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -375,7 +375,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; action.pCont = NULL; } else { - if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; + if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; } } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 72a4621eae02ebf99d3fa2de65655496bb710a42..e00d0d955ec820176cb696fa652c5e7fdd374edb 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -325,10 +325,10 @@ static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pV static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, int32_t *pContLen) { SAlterVnodeReplicaReq alterReq = { - alterReq.vgId = pVgroup->vgId, - alterReq.strict = pDb->cfg.strict, - alterReq.replica = pVgroup->replica, - alterReq.selfIndex = -1, + .vgId = pVgroup->vgId, + .strict = pDb->cfg.strict, + .replica = pVgroup->replica, + .selfIndex = -1, }; for (int32_t v = 0; v < pVgroup->replica; ++v) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 47b8614717152ea7b275b88cf354ac3ce4aa9666..8a3047ae32d426e27606ce9124ea0a32a7ef6b71 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -768,7 +768,7 @@ char* sync2SimpleStr(int64_t rid) { sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid); return NULL; } - ASSERT(rid == pSyncNode->rid); + char* s = syncNode2SimpleStr(pSyncNode); syncNodeRelease(pSyncNode); @@ -778,11 +778,9 @@ char* sync2SimpleStr(int64_t rid) { int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { - syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } - ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); syncNodeRelease(pSyncNode); @@ -3108,15 +3106,15 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p if (ths->pFsm->FpLeaderTransferCb != NULL) { SFsmCbMeta cbMeta = { - cbMeta.code = 0, - cbMeta.currentTerm = ths->pRaftStore->currentTerm, - cbMeta.flag = 0, - cbMeta.index = pEntry->index, - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index), - cbMeta.isWeak = pEntry->isWeak, - cbMeta.seqNum = pEntry->seqNum, - cbMeta.state = ths->state, - cbMeta.term = pEntry->term, + .code = 0, + .currentTerm = ths->pRaftStore->currentTerm, + .flag = 0, + .index = pEntry->index, + .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index), + .isWeak = pEntry->isWeak, + .seqNum = pEntry->seqNum, + .state = ths->state, + .term = pEntry->term, }; ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta); } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 35c831b52f9bc9679739271eeed564f855e6564b..c31dede0b3d1aba536834c81168cab6a08e5654c 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -146,15 +146,15 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { cnt++; SFsmCbMeta cbMeta = { - cbMeta.index = SYNC_INDEX_INVALID, - cbMeta.lastConfigIndex = SYNC_INDEX_INVALID, - cbMeta.isWeak = false, - cbMeta.code = TSDB_CODE_SYN_TIMEOUT, - cbMeta.state = pSyncNode->state, - cbMeta.seqNum = *pSeqNum, - cbMeta.term = SYNC_TERM_INVALID, - cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm, - cbMeta.flag = 0, + .index = SYNC_INDEX_INVALID, + .lastConfigIndex = SYNC_INDEX_INVALID, + .isWeak = false, + .code = TSDB_CODE_SYN_TIMEOUT, + .state = pSyncNode->state, + .seqNum = *pSeqNum, + .term = SYNC_TERM_INVALID, + .currentTerm = pSyncNode->pRaftStore->currentTerm, + .flag = 0, }; pStub->rpcMsg.pCont = NULL; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 5971033bf8610e4ad850ce177ee971cd3a006eb1..863cee9b08ced50308afc11db48a5f15c49f8162 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -246,9 +246,6 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { pool->nextId = (pool->nextId + 1) % pool->max; } - while (worker->pid <= 0) taosMsleep(10); - queue->threadId = worker->pid; - uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId); code = 0; _OVER: @@ -260,6 +257,9 @@ _OVER: if (worker->qall != NULL) taosFreeQall(worker->qall); return NULL; } else { + while (worker->pid <= 0) taosMsleep(10); + queue->threadId = worker->pid; + uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, queue->threadId); return queue; } }