diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 285e079b3ec90a066cd70fa3e7576dac3d5c8b8d..58c9b308901f6e9ed1bc8e871dd25997efe2fd15 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -132,7 +132,7 @@ typedef struct SSyncFSM { void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); - void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); + void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta *cbMeta); void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm); diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index cbf13924d73e9f2c6b5366e9f6be6254062e84b3..bb71044dd6eda5a03d42de3dd5efeb6033dd16a5 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -74,11 +74,14 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) { } tstrncpy(queue->name, proc->name, sizeof(queue->name)); + + taosThreadMutexLock(&queue->mutex); queue->head = 0; queue->tail = 0; queue->total = bufSize; queue->avail = bufSize; queue->items = 0; + taosThreadMutexUnlock(&queue->mutex); } return queue; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index f57943b9ddca9f6e33421e2a675f3e5cdc10cdbd..e83f1f7cabf67702e32da0a03becfe70296445d9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -301,7 +301,7 @@ int32_t dmInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - strncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN); + tstrncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN); rpcInit.localPort = tsServerPort; rpcInit.label = "DND-S"; rpcInit.numOfThreads = tsNumOfRpcThreads; diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index 5279a86d83c3070242c8e8280f875005a399a421..65857367f99e7a91816177e4d1c1dddf923b74ee 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -83,6 +83,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "create-acct"); if (pTrans == NULL) { + sdbFreeRaw(pRaw); mError("acct:%s, failed to create since %s", acctObj.acct, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 70c93748210605cb13493b9f8ab11c8dc2e9793f..96748b3620c9c2171e10f585fb71b478706d051e 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -315,7 +315,7 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) { return 0; } - mInfo("update cluster uptime to %" PRId64, clusterObj.upTime); + mInfo("update cluster uptime to %d", clusterObj.upTime); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime"); if (pTrans == NULL) return -1; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3655289f99268a469809a50a6704e13077fc08e9..aabde3fed2de9d509ef396ccd865b86727ae8a51 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -68,7 +68,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); } else { - mInfo("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode)); + mInfo("trans:%d, is proposed and post sem", transId); } pMgmt->transId = 0; taosWUnLockLatch(&pMgmt->lock); @@ -113,13 +113,13 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { } } -void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) { +void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) { SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - pMgmt->errCode = cbMeta.code; + pMgmt->errCode = cbMeta->code; mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, - cbMeta.code, cbMeta.index, cbMeta.term); + cbMeta->code, cbMeta->index, cbMeta->term); taosWLockLatch(&pMgmt->lock); if (pMgmt->transId == -1) { @@ -127,7 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM mError("trans:-1, failed to propose sync reconfig since %s, post sem", tstrerror(pMgmt->errCode)); } else { mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem", - pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); + pMgmt->transId, cbMeta->code, cbMeta->index, cbMeta->term); } pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); @@ -271,6 +271,11 @@ void mndCleanupSync(SMnode *pMnode) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; + if (req.contLen <= 0) { + terrno = TSDB_CODE_APP_ERROR; + return -1; + } + req.pCont = rpcMallocCont(req.contLen); if (req.pCont == NULL) return -1; memcpy(req.pCont, pRaw, req.contLen); @@ -278,7 +283,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { pMgmt->errCode = 0; taosWLockLatch(&pMgmt->lock); if (pMgmt->transId != 0) { - mError("trans:%d, can't be proposed since trans:%s alrady waiting for confirm", transId, pMgmt->transId); + mError("trans:%d, can't be proposed since trans:%d alrady waiting for confirm", transId, pMgmt->transId); taosWUnLockLatch(&pMgmt->lock); terrno = TSDB_CODE_APP_NOT_READY; return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 7308dc375ea37cea765b8016b5006797320e27e6..ce195454f8d6746dce07c33fe92e15abb85a50e4 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -776,7 +776,7 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl SName n; int32_t cols = 0; - char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0}; strcpy(varDataVal(topicName), mndGetDbStr(pTopic->name)); /*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/ /*tNameGetDbName(&n, varDataVal(topicName));*/ diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 9613d924c460a7b3a89fb17accd6abdbb698ed2c..455b71ace9fdc52bd4b1fc8a3e04d363273ed23c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -427,7 +427,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { terrno = 0; _OVER: - if (terrno != 0) { + if (terrno != 0 && pTrans != NULL) { mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr()); mndTransDropData(pTrans); taosMemoryFreeClear(pRow); @@ -629,6 +629,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->pRpcArray == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to create transaction since %s", terrstr()); + mndTransDrop(pTrans); return NULL; } @@ -1428,6 +1429,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr()); + return false; } sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); @@ -1617,7 +1619,7 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false); char lastInfo[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; - char detail[TSDB_TRANS_ERROR_LEN] = {0}; + char detail[TSDB_TRANS_ERROR_LEN + 1] = {0}; int32_t len = snprintf(detail, sizeof(detail), "action:%d code:0x%x(%s) ", pTrans->lastAction, pTrans->lastErrorNo & 0xFFFF, tstrerror(pTrans->lastErrorNo)); SEpSet epset = pTrans->lastEpset; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 5512fa410788a3eaea4adc4e5b08759a4568a999..fecdfb12ba2b8712794ad13e3ec19e7d33441e10 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -83,6 +83,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "create-user"); if (pTrans == NULL) { + sdbFreeRaw(pRaw); mError("user:%s, failed to create since %s", userObj.user, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index c1bc3408199ed92a7a101d04f66aa54f3ef47df2..a0f01f41d89496c7b6036e960e49b3b10537b9e2 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1250,7 +1250,7 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64, pVgroup->dbName, pVgroup->vgId, vgMem, pNew1->id, pNew1->memAvail, pNew1->memUsed); terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE; - return -1; + goto _OVER; } else { pNew1->memUsed += vgMem; } @@ -1272,7 +1272,7 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64, pVgroup->dbName, pVgroup->vgId, vgMem, pNew2->id, pNew2->memAvail, pNew2->memUsed); terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE; - return -1; + goto _OVER; } else { pNew2->memUsed += vgMem; } @@ -1293,7 +1293,7 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64, pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed); terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE; - return -1; + goto _OVER; } else { pNew3->memUsed += vgMem; } @@ -1627,7 +1627,7 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj } else if (newVg1.replica == 3) { SVnodeGid del1 = {0}; if (mndRemoveVnodeFromVgroup(pMnode, &newVg1, pArray, &del1) != 0) goto _OVER; - if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del1, true) != 0) return -1; + if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del1, true) != 0) goto _OVER; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 980761cd145b7f27ae96cc00d21259083cdefba9..aa3f72d281a0c59c9e317b1bbc11930c44c8916b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -496,16 +496,16 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } -static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) { +static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) { SVnode *pVnode = pFsm->data; SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; - syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info); - rpcMsg.info.conn.applyIndex = cbMeta.index; + syncGetAndDelRespRpc(pVnode->sync, cbMeta->newCfgSeqNum, &rpcMsg.info); + rpcMsg.info.conn.applyIndex = cbMeta->index; const STraceId *trace = (STraceId *)&pMsg->info.traceId; vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), - TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle); + TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta->seqNum, rpcMsg.info.handle); if (rpcMsg.info.handle != NULL) { tmsgSendRsp(&rpcMsg); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 52feb625a8d1fb7bf2eddba814859e064fac89fe..76fd345bdd1226a960c13c94de9d86ea7d954f1e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2942,7 +2942,7 @@ static int32_t syncNodeConfigChangeFinish(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyn cbMeta.newCfgTerm = pFinish->newCfgTerm; cbMeta.newCfgSeqNum = pFinish->newCfgSeqNum; - ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta); + ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, &cbMeta); } // clear changing diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 714d233984072890a269b66f0062ae21e1f539e2..fbfc4cda8eb01de93b8d50c3002b1d59c97faaa9 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -149,9 +149,9 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } -void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { +void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta *cbMeta) { sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, - cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); + cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term); } SSyncFSM* createFsm() { diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index ba3fc7765078e5123cf8992eb79c170c8b52de3e..cfab3b6ae37133155cc8b2de91edf44abd33a76c 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -80,9 +80,9 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } -void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { +void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta *cbMeta) { sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64, - cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); + cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term); } SSyncFSM* createFsm() { diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 9e9769224f18ac29297e8936e7f93fab817612d5..e718d37376c7aa15a131156b638fb12ea11319ca 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -153,11 +153,11 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); } -void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { +void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) { char* s = syncCfg2Str(&(cbMeta.newCfg)); sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64 ", newCfg:%s", - cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s); + cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term, s); taosMemoryFree(s); }