提交 a9d13928 编写于 作者: S Shengliang Guan

fix bug in transaction

上级 34619844
...@@ -479,9 +479,11 @@ static int32_t dndDropMnode(SDnode *pDnode) { ...@@ -479,9 +479,11 @@ static int32_t dndDropMnode(SDnode *pDnode) {
return -1; return -1;
} }
dndReleaseMnode(pDnode, pMnode);
dndStopMnodeWorker(pDnode); dndStopMnodeWorker(pDnode);
dndWriteMnodeFile(pDnode); dndWriteMnodeFile(pDnode);
mndClose(pMnode); mndClose(pMnode);
pMgmt->pMnode = NULL;
mndDestroy(pDnode->dir.mnode); mndDestroy(pDnode->dir.mnode);
return 0; return 0;
...@@ -661,6 +663,8 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { ...@@ -661,6 +663,8 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
dndReleaseMnode(pDnode, pMnode);
} }
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
...@@ -945,7 +949,7 @@ void dndCleanupMnode(SDnode *pDnode) { ...@@ -945,7 +949,7 @@ void dndCleanupMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
dInfo("dnode-mnode start to clean up"); dInfo("dnode-mnode start to clean up");
dndStopMnodeWorker(pDnode); if (pMgmt->pMnode) dndStopMnodeWorker(pDnode);
dndCleanupMnodeMgmtWorker(pDnode); dndCleanupMnodeMgmtWorker(pDnode);
dndFreeMnodeMgmtQueue(pDnode); dndFreeMnodeMgmtQueue(pDnode);
tfree(pMgmt->file); tfree(pMgmt->file);
......
...@@ -136,7 +136,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -136,7 +136,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
DndMsgFp fp = pMgmt->msgFp[msgType]; DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) { if (fp != NULL) {
(*fp)(pDnode, pMsg, pEpSet); (*fp)(pDnode, pMsg, pEpSet);
dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF); dTrace("RPC %p, rsp:%s is processed, code:0x%x", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF);
} else { } else {
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]); dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -184,7 +184,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -184,7 +184,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dTrace("RPC %p, network test req, app:%p will be processed", pMsg->handle, pMsg->ahandle); dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code);
dndProcessDnodeReq(pDnode, pMsg, pEpSet); dndProcessDnodeReq(pDnode, pMsg, pEpSet);
return; return;
} }
......
...@@ -140,6 +140,28 @@ TEST_F(DndTestMnode, 04_Create_Mnode) { ...@@ -140,6 +140,28 @@ TEST_F(DndTestMnode, 04_Create_Mnode) {
CheckTimestamp(); CheckTimestamp();
CheckTimestamp(); CheckTimestamp();
} }
{
// drop mnode
int32_t contLen = sizeof(SDropMnodeMsg);
SDropMnodeMsg* pReq = (SDropMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_MNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, "");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckBinary("master", 12);
CheckInt64(0);
CheckTimestamp();
}
} }
// { // {
// int32_t contLen = sizeof(SDropDnodeMsg); // int32_t contLen = sizeof(SDropDnodeMsg);
......
...@@ -290,11 +290,6 @@ typedef struct SMnodeMsg { ...@@ -290,11 +290,6 @@ typedef struct SMnodeMsg {
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int32_t acctId; int32_t acctId;
SMnode *pMnode; SMnode *pMnode;
int16_t received;
int16_t successed;
int16_t expected;
int16_t retry;
int32_t code;
int64_t createdTime; int64_t createdTime;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int32_t contLen; int32_t contLen;
......
...@@ -828,9 +828,9 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { ...@@ -828,9 +828,9 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SSyncDbMsg *pSync = pMsg->rpcMsg.pCont; SSyncDbMsg *pSync = pMsg->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); SDbObj *pDb = mndAcquireDb(pMnode, pSync->db);
if (pDb == NULL) { if (pDb == NULL) {
mError("db:%s, failed to process sync db msg since %s", pMsg->db, terrstr()); mError("db:%s, failed to process sync db msg since %s", pSync->db, terrstr());
return -1; return -1;
} }
...@@ -841,9 +841,9 @@ static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { ...@@ -841,9 +841,9 @@ static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont; SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db);
if (pDb == NULL) { if (pDb == NULL) {
mError("db:%s, failed to process compact db msg since %s", pMsg->db, terrstr()); mError("db:%s, failed to process compact db msg since %s", pCompact->db, terrstr());
return -1; return -1;
} }
......
...@@ -440,6 +440,8 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -440,6 +440,8 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
} }
alterMsg.replica = numOfReplicas;
while (1) { while (1) {
SMnodeObj *pMObj = NULL; SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
...@@ -506,12 +508,12 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { ...@@ -506,12 +508,12 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) {
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, pObj) != 0) { if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto DROP_MNODE_OVER; goto DROP_MNODE_OVER;
} }
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, pObj) != 0) { if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto DROP_MNODE_OVER; goto DROP_MNODE_OVER;
} }
......
...@@ -625,7 +625,7 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) { ...@@ -625,7 +625,7 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) {
pAction->errCode = pMsg->rpcMsg.code; pAction->errCode = pMsg->rpcMsg.code;
} }
mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code); mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->rpcMsg.code);
mndTransExecute(pMnode, pTrans); mndTransExecute(pMnode, pTrans);
HANDLE_ACTION_RSP_OVER: HANDLE_ACTION_RSP_OVER:
...@@ -696,7 +696,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -696,7 +696,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue; if (pAction == NULL) continue;
if (pAction->msgSent) continue; if (pAction->msgReceived && pAction->errCode == 0) continue;
int64_t signature = pTrans->id; int64_t signature = pTrans->id;
signature = (signature << 32); signature = (signature << 32);
...@@ -736,6 +736,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -736,6 +736,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
terrno = errorCode; terrno = errorCode;
return errorCode; return errorCode;
} else { } else {
mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceivedMsgs, numOfActions, errorCode);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
} }
......
...@@ -38,6 +38,7 @@ int32_t tWorkerInit(SWorkerPool *pool) { ...@@ -38,6 +38,7 @@ int32_t tWorkerInit(SWorkerPool *pool) {
void tWorkerCleanup(SWorkerPool *pool) { void tWorkerCleanup(SWorkerPool *pool) {
for (int i = 0; i < pool->max; ++i) { for (int i = 0; i < pool->max; ++i) {
SWorker *worker = pool->workers + i; SWorker *worker = pool->workers + i;
if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset); taosQsetThreadResume(pool->qset);
} }
...@@ -45,6 +46,7 @@ void tWorkerCleanup(SWorkerPool *pool) { ...@@ -45,6 +46,7 @@ void tWorkerCleanup(SWorkerPool *pool) {
for (int i = 0; i < pool->max; ++i) { for (int i = 0; i < pool->max; ++i) {
SWorker *worker = pool->workers + i; SWorker *worker = pool->workers + i;
if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
pthread_join(worker->thread, NULL); pthread_join(worker->thread, NULL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册