提交 3152824a 编写于 作者: S Shengliang Guan

return uid when deleting db

上级 001cab2d
...@@ -506,21 +506,26 @@ typedef struct { ...@@ -506,21 +506,26 @@ typedef struct {
} SAlterDbReq; } SAlterDbReq;
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int8_t ignoreNotExists; int8_t ignoreNotExists;
} SDropDbReq; } SDropDbReq;
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
uint64_t uid;
} SDropDbRsp;
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int32_t vgVersion; int32_t vgVersion;
} SUseDbReq; } SUseDbReq;
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
} SSyncDbReq; } SSyncDbReq;
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
} SCompactDbReq; } SCompactDbReq;
typedef struct { typedef struct {
......
...@@ -128,6 +128,8 @@ typedef struct { ...@@ -128,6 +128,8 @@ typedef struct {
int32_t failedTimes; int32_t failedTimes;
void* rpcHandle; void* rpcHandle;
void* rpcAHandle; void* rpcAHandle;
void* rpcRsp;
int32_t rpcRspLen;
SArray* redoLogs; SArray* redoLogs;
SArray* undoLogs; SArray* undoLogs;
SArray* commitLogs; SArray* commitLogs;
......
...@@ -36,16 +36,17 @@ typedef struct { ...@@ -36,16 +36,17 @@ typedef struct {
int32_t mndInitTrans(SMnode *pMnode); int32_t mndInitTrans(SMnode *pMnode);
void mndCleanupTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg); STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq);
void mndTransDrop(STrans *pTrans); void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SMnodeMsg *pMsg); void mndTransProcessRsp(SMnodeMsg *pRsp);
void mndTransPullup(SMnode *pMnode); void mndTransPullup(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -767,6 +767,14 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) { ...@@ -767,6 +767,14 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
int32_t rspLen = sizeof(SDropDbRsp);
SDropDbRsp *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) goto DROP_DB_OVER;
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
pRsp->uid = htobe64(pDb->uid);
mndTransSetRpcRsp(pTrans, pRsp, rspLen);
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER;
code = 0; code = 0;
......
...@@ -308,6 +308,11 @@ static void mndTransDropData(STrans *pTrans) { ...@@ -308,6 +308,11 @@ static void mndTransDropData(STrans *pTrans) {
mndTransDropLogs(pTrans->commitLogs); mndTransDropLogs(pTrans->commitLogs);
mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions); mndTransDropActions(pTrans->undoActions);
if (pTrans->rpcRsp != NULL) {
rpcFreeCont(pTrans->rpcRsp);
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
}
} }
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
...@@ -339,7 +344,7 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { ...@@ -339,7 +344,7 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
sdbRelease(pSdb, pTrans); sdbRelease(pSdb, pTrans);
} }
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
STrans *pTrans = calloc(1, sizeof(STrans)); STrans *pTrans = calloc(1, sizeof(STrans));
if (pTrans == NULL) { if (pTrans == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -350,8 +355,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { ...@@ -350,8 +355,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) {
pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS); pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
pTrans->stage = TRN_STAGE_PREPARE; pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->rpcHandle = pMsg->handle; pTrans->rpcHandle = pReq->handle;
pTrans->rpcAHandle = pMsg->ahandle; pTrans->rpcAHandle = pReq->ahandle;
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
...@@ -436,6 +441,11 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { ...@@ -436,6 +441,11 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
return mndTransAppendAction(pTrans->undoActions, pAction); return mndTransAppendAction(pTrans->undoActions, pAction);
} }
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) {
pTrans->rpcRsp = pCont;
pTrans->rpcRspLen = contLen;
}
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans); SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) { if (pRaw == NULL) {
...@@ -479,6 +489,11 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { ...@@ -479,6 +489,11 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pNew->rpcHandle = pTrans->rpcHandle; pNew->rpcHandle = pTrans->rpcHandle;
pNew->rpcAHandle = pTrans->rpcAHandle; pNew->rpcAHandle = pTrans->rpcAHandle;
pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen;
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
mndTransExecute(pMnode, pNew); mndTransExecute(pMnode, pNew);
mndReleaseTrans(pMnode, pNew); mndReleaseTrans(pMnode, pNew);
return 0; return 0;
...@@ -529,15 +544,21 @@ static void mndTransSendRpcRsp(STrans *pTrans) { ...@@ -529,15 +544,21 @@ static void mndTransSendRpcRsp(STrans *pTrans) {
if (sendRsp && pTrans->rpcHandle != NULL) { if (sendRsp && pTrans->rpcHandle != NULL) {
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
pTrans->rpcAHandle); pTrans->rpcAHandle);
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle}; SRpcMsg rspMsg = {.handle = pTrans->rpcHandle,
.code = pTrans->code,
.ahandle = pTrans->rpcAHandle,
.pCont = pTrans->rpcRsp,
.contLen = pTrans->rpcRspLen};
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
pTrans->rpcHandle = NULL; pTrans->rpcHandle = NULL;
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
} }
} }
void mndTransProcessRsp(SMnodeMsg *pMsg) { void mndTransProcessRsp(SMnodeMsg *pRsp) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pRsp->pMnode;
int64_t signature = (int64_t)(pMsg->rpcMsg.ahandle); int64_t signature = (int64_t)(pRsp->rpcMsg.ahandle);
int32_t transId = (int32_t)(signature >> 32); int32_t transId = (int32_t)(signature >> 32);
int32_t action = (int32_t)((signature << 32) >> 32); int32_t action = (int32_t)((signature << 32) >> 32);
...@@ -571,10 +592,10 @@ void mndTransProcessRsp(SMnodeMsg *pMsg) { ...@@ -571,10 +592,10 @@ void mndTransProcessRsp(SMnodeMsg *pMsg) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction != NULL) { if (pAction != NULL) {
pAction->msgReceived = 1; pAction->msgReceived = 1;
pAction->errCode = pMsg->rpcMsg.code; pAction->errCode = pRsp->rpcMsg.code;
} }
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pMsg->rpcMsg.code, mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code,
pAction->acceptableCode); pAction->acceptableCode);
mndTransExecute(pMnode, pTrans); mndTransExecute(pMnode, pTrans);
...@@ -921,7 +942,7 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { ...@@ -921,7 +942,7 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
void mndTransPullup(SMnode *pMnode) { void mndTransPullup(SMnode *pMnode) {
STrans *pTrans = NULL; STrans *pTrans = NULL;
void * pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
......
...@@ -202,6 +202,10 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { ...@@ -202,6 +202,10 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont;
pDrop->uid = htobe64(pDrop->uid);
EXPECT_STREQ(pDrop->db, "1.d1");
} }
test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, "");
...@@ -249,6 +253,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { ...@@ -249,6 +253,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
CheckBinary("d2", TSDB_DB_NAME_LEN - 1); CheckBinary("d2", TSDB_DB_NAME_LEN - 1);
uint64_t d2_uid = 0;
{ {
int32_t contLen = sizeof(SUseDbReq); int32_t contLen = sizeof(SUseDbReq);
...@@ -262,6 +268,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { ...@@ -262,6 +268,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont; SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont;
EXPECT_STREQ(pRsp->db, "1.d2"); EXPECT_STREQ(pRsp->db, "1.d2");
pRsp->uid = htobe64(pRsp->uid);
d2_uid = pRsp->uid;
pRsp->vgVersion = htonl(pRsp->vgVersion); pRsp->vgVersion = htonl(pRsp->vgVersion);
pRsp->vgNum = htonl(pRsp->vgNum); pRsp->vgNum = htonl(pRsp->vgNum);
pRsp->hashMethod = pRsp->hashMethod; pRsp->hashMethod = pRsp->hashMethod;
...@@ -311,5 +319,10 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { ...@@ -311,5 +319,10 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont;
pDrop->uid = htobe64(pDrop->uid);
EXPECT_STREQ(pDrop->db, "1.d2");
EXPECT_EQ(pDrop->uid, d2_uid);
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册