提交 2b73e4f4 编写于 作者: S Shengliang Guan

serialize drop vnode req

上级 69ca3783
...@@ -2019,3 +2019,34 @@ int32_t tDeserializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) { ...@@ -2019,3 +2019,34 @@ int32_t tDeserializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) {
tCoderClear(&decoder); tCoderClear(&decoder);
return 0; return 0;
} }
int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
\ No newline at end of file
...@@ -556,12 +556,6 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra ...@@ -556,12 +556,6 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra
pCfg->vgVersion = pCreate->vgVersion; pCfg->vgVersion = pCreate->vgVersion;
} }
static SDropVnodeReq *dndParseDropVnodeReq(SRpcMsg *pReq) {
SDropVnodeReq *pDrop = pReq->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return pDrop;
}
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq); SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq);
dDebug("vgId:%d, create vnode req is received", pCreate->vgId); dDebug("vgId:%d, create vnode req is received", pCreate->vgId);
...@@ -652,9 +646,10 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -652,9 +646,10 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDropVnodeReq *pDrop = dndParseDropVnodeReq(pReq); SDropVnodeReq dropReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq);
int32_t vgId = pDrop->vgId; int32_t vgId = dropReq.vgId;
dDebug("vgId:%d, drop vnode req is received", vgId); dDebug("vgId:%d, drop vnode req is received", vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
...@@ -678,9 +673,10 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -678,9 +673,10 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SSyncVnodeReq *pSync = (SSyncVnodeReq *)dndParseDropVnodeReq(pReq); SSyncVnodeReq syncReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq);
int32_t vgId = pSync->vgId; int32_t vgId = syncReq.vgId;
dDebug("vgId:%d, sync vnode req is received", vgId); dDebug("vgId:%d, sync vnode req is received", vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
...@@ -700,9 +696,10 @@ int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -700,9 +696,10 @@ int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SCompactVnodeReq *pCompact = (SCompactVnodeReq *)dndParseDropVnodeReq(pReq); SCompactVnodeReq compatcReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq);
int32_t vgId = pCompact->vgId; int32_t vgId = compatcReq.vgId;
dDebug("vgId:%d, compact vnode req is received", vgId); dDebug("vgId:%d, compact vnode req is received", vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
......
...@@ -312,17 +312,19 @@ TEST_F(DndTestVnode, 05_DROP_Stb) { ...@@ -312,17 +312,19 @@ TEST_F(DndTestVnode, 05_DROP_Stb) {
TEST_F(DndTestVnode, 06_Drop_Vnode) { TEST_F(DndTestVnode, 06_Drop_Vnode) {
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SDropVnodeReq); SDropVnodeReq dropReq = {0};
dropReq.vgId = 2;
dropReq.dnodeId = 1;
strcpy(dropReq.db, "1.d1");
dropReq.dbUid = 9527;
SDropVnodeReq* pReq = (SDropVnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
pReq->vgId = htonl(2); void* pReq = rpcMallocCont(contLen);
pReq->dnodeId = htonl(1); tSerializeSDropVnodeReq(pReq, contLen, &dropReq);
strcpy(pReq->db, "1.d1");
pReq->dbUid = htobe64(9527);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq; rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropVnodeReq); rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_DND_DROP_VNODE; rpcMsg.msgType = TDMT_DND_DROP_VNODE;
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen);
......
...@@ -32,7 +32,7 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); ...@@ -32,7 +32,7 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -365,11 +365,12 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -365,11 +365,12 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup); int32_t contLen = 0;
SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SDropVnodeReq); action.contLen = contLen;
action.msgType = TDMT_DND_DROP_VNODE; action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED; action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
if (mndTransAppendUndoAction(pTrans, &action) != 0) { if (mndTransAppendUndoAction(pTrans, &action) != 0) {
...@@ -754,11 +755,12 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -754,11 +755,12 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup); int32_t contLen = 0;
SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SDropVnodeReq); action.contLen = contLen;
action.msgType = TDMT_DND_DROP_VNODE; action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED; action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......
...@@ -248,19 +248,29 @@ SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbOb ...@@ -248,19 +248,29 @@ SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
return pCreate; return pCreate;
} }
SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup,
SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq)); int32_t *pContLen) {
if (pDrop == NULL) { SDropVnodeReq dropReq = {0};
dropReq.dnodeId = pDnode->id;
dropReq.vgId = pVgroup->vgId;
memcpy(dropReq.db, pDb->name, TSDB_DB_FNAME_LEN);
dropReq.dbUid = pDb->uid;
int32_t contLen = tSerializeSDropVnodeReq(NULL, 0, &dropReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pDrop->dnodeId = htonl(pDnode->id); void *pReq = malloc(contLen);
pDrop->vgId = htonl(pVgroup->vgId); if (pReq == NULL) {
memcpy(pDrop->db, pDb->name, TSDB_DB_FNAME_LEN); terrno = TSDB_CODE_OUT_OF_MEMORY;
pDrop->dbUid = htobe64(pDb->uid); return NULL;
}
return pDrop; tSerializeSDropVnodeReq(pReq, contLen, &dropReq);
*pContLen = contLen;
return pReq;
} }
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册