diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0aca468acbfee179aaa2426ce5f24c5e1cd16a05..f6af9782adc1efda36969abd20fb22fb20384394 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2020,6 +2020,87 @@ int32_t tDeserializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) { return 0; } +int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + if (tEncodeU64(&encoder, pReq->dbUid) < 0) return -1; + if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; + if (tEncodeI32(&encoder, pReq->cacheBlockSize) < 0) return -1; + if (tEncodeI32(&encoder, pReq->totalBlocks) < 0) return -1; + if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1; + if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1; + if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1; + if (tEncodeI32(&encoder, pReq->daysToKeep2) < 0) return -1; + if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1; + if (tEncodeI32(&encoder, pReq->maxRows) < 0) return -1; + if (tEncodeI32(&encoder, pReq->commitTime) < 0) return -1; + if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1; + if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1; + if (tEncodeI8(&encoder, pReq->precision) < 0) return -1; + if (tEncodeI8(&encoder, pReq->compression) < 0) return -1; + if (tEncodeI8(&encoder, pReq->quorum) < 0) return -1; + if (tEncodeI8(&encoder, pReq->update) < 0) return -1; + if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; + if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; + if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { + SReplica *pReplica = &pReq->replicas[i]; + if (tEncodeI32(&encoder, pReplica->id) < 0) return -1; + if (tEncodeU16(&encoder, pReplica->port) < 0) return -1; + if (tEncodeCStr(&encoder, pReplica->fqdn) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->cacheBlockSize) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->totalBlocks) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->daysToKeep2) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->minRows) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->maxRows) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->commitTime) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->quorum) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->update) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1; + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { + SReplica *pReplica = &pReq->replicas[i]; + if (tDecodeI32(&decoder, &pReplica->id) < 0) return -1; + if (tDecodeU16(&decoder, &pReplica->port) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReplica->fqdn) < 0) return -1; + } + + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; +} + int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); @@ -2043,7 +2124,7 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; tEndDecode(&decoder); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 0a417788f9dd7928ddfb19dac8c078e765ae982d..ebb2d1b4f0e4b9d63ae862bfa47b755bb3dcb9f9 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -15,8 +15,8 @@ #define _DEFAULT_SOURCE #include "dndVnodes.h" -#include "dndTransport.h" #include "dndMgmt.h" +#include "dndTransport.h" typedef struct { int32_t vgId; @@ -34,9 +34,9 @@ typedef struct { int8_t dropped; int8_t accessState; uint64_t dbUid; - char * db; - char * path; - SVnode * pImpl; + char *db; + char *path; + SVnode *pImpl; STaosQueue *pWriteQ; STaosQueue *pSyncQ; STaosQueue *pApplyQ; @@ -50,7 +50,7 @@ typedef struct { int32_t failed; int32_t threadIndex; pthread_t thread; - SDnode * pDnode; + SDnode *pDnode; SWrapperCfg *pCfgs; } SVnodeThread; @@ -68,7 +68,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg); -static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId); +static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl); static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode); @@ -81,7 +81,7 @@ static void dndCloseVnodes(SDnode *pDnode); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj * pVnode = NULL; + SVnodeObj *pVnode = NULL; int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); @@ -112,7 +112,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj)); + SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -189,7 +189,7 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; - SVnodeObj * pVnode = *ppVnode; + SVnodeObj *pVnode = *ppVnode; if (pVnode && num < size) { int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); @@ -211,9 +211,9 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_ int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; int32_t len = 0; int32_t maxLen = 30000; - char * content = calloc(1, maxLen + 1); - cJSON * root = NULL; - FILE * fp = NULL; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; char file[PATH_MAX + 20] = {0}; SWrapperCfg *pCfgs = NULL; @@ -254,7 +254,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_ } for (int32_t i = 0; i < vnodesNum; ++i) { - cJSON * vnode = cJSON_GetArrayItem(vnodes, i); + cJSON *vnode = cJSON_GetArrayItem(vnodes, i); SWrapperCfg *pCfg = &pCfgs[i]; cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); @@ -326,7 +326,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { int32_t len = 0; int32_t maxLen = 65536; - char * content = calloc(1, maxLen + 1); + char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n"); @@ -368,8 +368,8 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { static void *dnodeOpenVnodeFunc(void *param) { SVnodeThread *pThread = param; - SDnode * pDnode = pThread->pDnode; - SVnodesMgmt * pMgmt = &pDnode->vmgmt; + SDnode *pDnode = pThread->pDnode; + SVnodesMgmt *pMgmt = &pDnode->vmgmt; dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); setThreadName("open-vnodes"); @@ -383,7 +383,7 @@ static void *dnodeOpenVnodeFunc(void *param) { dndReportStartup(pDnode, "open-vnodes", stepDesc); SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; - SVnode * pImpl = vnodeOpen(pCfg->path, &cfg); + SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -499,31 +499,6 @@ static void dndCloseVnodes(SDnode *pDnode) { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) { - SCreateVnodeReq *pCreate = pReq->pCont; - pCreate->vgId = htonl(pCreate->vgId); - pCreate->dnodeId = htonl(pCreate->dnodeId); - pCreate->dbUid = htobe64(pCreate->dbUid); - pCreate->vgVersion = htonl(pCreate->vgVersion); - pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); - pCreate->totalBlocks = htonl(pCreate->totalBlocks); - pCreate->daysPerFile = htonl(pCreate->daysPerFile); - pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0); - pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1); - pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2); - pCreate->minRows = htonl(pCreate->minRows); - pCreate->maxRows = htonl(pCreate->maxRows); - pCreate->commitTime = htonl(pCreate->commitTime); - pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod); - for (int r = 0; r < pCreate->replica; ++r) { - SReplica *pReplica = &pCreate->replicas[r]; - pReplica->id = htonl(pReplica->id); - pReplica->port = htons(pReplica->port); - } - - return pCreate; -} - static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->vgId = pCreate->vgId; pCfg->wsize = pCreate->cacheBlockSize; @@ -557,24 +532,29 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra } int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { - SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq); - dDebug("vgId:%d, create vnode req is received", pCreate->vgId); + SCreateVnodeReq createReq = {0}; + if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + dDebug("vgId:%d, create vnode req is received", createReq.vgId); SVnodeCfg vnodeCfg = {0}; - dndGenerateVnodeCfg(pCreate, &vnodeCfg); + dndGenerateVnodeCfg(&createReq, &vnodeCfg); SWrapperCfg wrapperCfg = {0}; - dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg); + dndGenerateWrapperCfg(pDnode, &createReq, &wrapperCfg); - if (pCreate->dnodeId != dndGetDnodeId(pDnode)) { + if (createReq.dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION; - dDebug("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); + dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); return -1; } - SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId); + SVnodeObj *pVnode = dndAcquireVnode(pDnode, createReq.vgId); if (pVnode != NULL) { - dDebug("vgId:%d, already exist", pCreate->vgId); + dDebug("vgId:%d, already exist", createReq.vgId); dndReleaseVnode(pDnode, pVnode); terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED; return -1; @@ -585,13 +565,13 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { vnodeCfg.dbId = wrapperCfg.dbUid; SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); if (pImpl == NULL) { - dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); + dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); return -1; } int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl); if (code != 0) { - dError("vgId:%d, failed to open vnode since %s", pCreate->vgId, terrstr()); + dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr()); vnodeClose(pImpl); vnodeDestroy(wrapperCfg.path); terrno = code; @@ -610,32 +590,37 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { - SAlterVnodeReq *pAlter = (SAlterVnodeReq *)dndParseCreateVnodeReq(pReq); - dDebug("vgId:%d, alter vnode req is received", pAlter->vgId); + SAlterVnodeReq alterReq = {0}; + if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + dDebug("vgId:%d, alter vnode req is received", alterReq.vgId); SVnodeCfg vnodeCfg = {0}; - dndGenerateVnodeCfg(pAlter, &vnodeCfg); + dndGenerateVnodeCfg(&alterReq, &vnodeCfg); - SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId); + SVnodeObj *pVnode = dndAcquireVnode(pDnode, alterReq.vgId); if (pVnode == NULL) { - dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); + dDebug("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr()); return -1; } - if (pAlter->vgVersion == pVnode->vgVersion) { + if (alterReq.vgVersion == pVnode->vgVersion) { dndReleaseVnode(pDnode, pVnode); - dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId); + dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", alterReq.vgId); return 0; } if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) { - dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); + dError("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); return -1; } int32_t oldVersion = pVnode->vgVersion; - pVnode->vgVersion = pAlter->vgVersion; + pVnode->vgVersion = alterReq.vgVersion; int32_t code = dndWriteVnodesToFile(pDnode); if (code != 0) { pVnode->vgVersion = oldVersion; @@ -647,7 +632,10 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SDropVnodeReq dropReq = {0}; - tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq); + if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } int32_t vgId = dropReq.vgId; dDebug("vgId:%d, drop vnode req is received", vgId); diff --git a/source/dnode/mgmt/impl/test/vnode/vnode.cpp b/source/dnode/mgmt/impl/test/vnode/vnode.cpp index 0e9f0bd5bf95b73a07f614e62b2dbcc03b5c7711..4457faf7b1aa97215fa9433385e9e0438f8cd5da 100644 --- a/source/dnode/mgmt/impl/test/vnode/vnode.cpp +++ b/source/dnode/mgmt/impl/test/vnode/vnode.cpp @@ -27,38 +27,40 @@ Testbase DndTestVnode::test; TEST_F(DndTestVnode, 01_Create_Vnode) { for (int i = 0; i < 3; ++i) { - int32_t contLen = sizeof(SCreateVnodeReq); - - SCreateVnodeReq* pReq = (SCreateVnodeReq*)rpcMallocCont(contLen); - pReq->vgId = htonl(2); - pReq->dnodeId = htonl(1); - strcpy(pReq->db, "1.d1"); - pReq->dbUid = htobe64(9527); - pReq->vgVersion = htonl(1); - pReq->cacheBlockSize = htonl(16); - pReq->totalBlocks = htonl(10); - pReq->daysPerFile = htonl(10); - pReq->daysToKeep0 = htonl(3650); - pReq->daysToKeep1 = htonl(3650); - pReq->daysToKeep2 = htonl(3650); - pReq->minRows = htonl(100); - pReq->minRows = htonl(4096); - pReq->commitTime = htonl(3600); - pReq->fsyncPeriod = htonl(3000); - pReq->walLevel = 1; - pReq->precision = 0; - pReq->compression = 2; - pReq->replica = 1; - pReq->quorum = 1; - pReq->update = 0; - pReq->cacheLastRow = 0; - pReq->selfIndex = 0; - for (int r = 0; r < pReq->replica; ++r) { - SReplica* pReplica = &pReq->replicas[r]; - pReplica->id = htonl(1); - pReplica->port = htons(9527); + SCreateVnodeReq createReq = {0}; + createReq.vgId = 2; + createReq.dnodeId = 1; + strcpy(createReq.db, "1.d1"); + createReq.dbUid = 9527; + createReq.vgVersion = 1; + createReq.cacheBlockSize = 16; + createReq.totalBlocks = 10; + createReq.daysPerFile = 10; + createReq.daysToKeep0 = 3650; + createReq.daysToKeep1 = 3650; + createReq.daysToKeep2 = 3650; + createReq.minRows = 100; + createReq.minRows = 4096; + createReq.commitTime = 3600; + createReq.fsyncPeriod = 3000; + createReq.walLevel = 1; + createReq.precision = 0; + createReq.compression = 2; + createReq.replica = 1; + createReq.quorum = 1; + createReq.update = 0; + createReq.cacheLastRow = 0; + createReq.selfIndex = 0; + for (int r = 0; r < createReq.replica; ++r) { + SReplica* pReplica = &createReq.replicas[r]; + pReplica->id = 1; + pReplica->port = 9527; } + int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSCreateVnodeReq(pReq, contLen, &createReq); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_VNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); if (i == 0) { @@ -70,38 +72,40 @@ TEST_F(DndTestVnode, 01_Create_Vnode) { } { - int32_t contLen = sizeof(SCreateVnodeReq); - - SCreateVnodeReq* pReq = (SCreateVnodeReq*)rpcMallocCont(contLen); - pReq->vgId = htonl(2); - pReq->dnodeId = htonl(3); - strcpy(pReq->db, "1.d1"); - pReq->dbUid = htobe64(9527); - pReq->vgVersion = htonl(1); - pReq->cacheBlockSize = htonl(16); - pReq->totalBlocks = htonl(10); - pReq->daysPerFile = htonl(10); - pReq->daysToKeep0 = htonl(3650); - pReq->daysToKeep1 = htonl(3650); - pReq->daysToKeep2 = htonl(3650); - pReq->minRows = htonl(100); - pReq->minRows = htonl(4096); - pReq->commitTime = htonl(3600); - pReq->fsyncPeriod = htonl(3000); - pReq->walLevel = 1; - pReq->precision = 0; - pReq->compression = 2; - pReq->replica = 1; - pReq->quorum = 1; - pReq->update = 0; - pReq->cacheLastRow = 0; - pReq->selfIndex = 0; - for (int r = 0; r < pReq->replica; ++r) { - SReplica* pReplica = &pReq->replicas[r]; - pReplica->id = htonl(1); - pReplica->port = htons(9527); + SCreateVnodeReq createReq = {0}; + createReq.vgId = 2; + createReq.dnodeId = 3; + strcpy(createReq.db, "1.d1"); + createReq.dbUid = 9527; + createReq.vgVersion = 1; + createReq.cacheBlockSize = 16; + createReq.totalBlocks = 10; + createReq.daysPerFile = 10; + createReq.daysToKeep0 = 3650; + createReq.daysToKeep1 = 3650; + createReq.daysToKeep2 = 3650; + createReq.minRows = 100; + createReq.minRows = 4096; + createReq.commitTime = 3600; + createReq.fsyncPeriod = 3000; + createReq.walLevel = 1; + createReq.precision = 0; + createReq.compression = 2; + createReq.replica = 1; + createReq.quorum = 1; + createReq.update = 0; + createReq.cacheLastRow = 0; + createReq.selfIndex = 0; + for (int r = 0; r < createReq.replica; ++r) { + SReplica* pReplica = &createReq.replicas[r]; + pReplica->id = 1; + pReplica->port = 9527; } + int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSCreateVnodeReq(pReq, contLen, &createReq); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_VNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_INVALID_OPTION); @@ -110,38 +114,40 @@ TEST_F(DndTestVnode, 01_Create_Vnode) { TEST_F(DndTestVnode, 02_Alter_Vnode) { for (int i = 0; i < 3; ++i) { - int32_t contLen = sizeof(SAlterVnodeReq); - - SAlterVnodeReq* pReq = (SAlterVnodeReq*)rpcMallocCont(contLen); - pReq->vgId = htonl(2); - pReq->dnodeId = htonl(1); - strcpy(pReq->db, "1.d1"); - pReq->dbUid = htobe64(9527); - pReq->vgVersion = htonl(2); - pReq->cacheBlockSize = htonl(16); - pReq->totalBlocks = htonl(10); - pReq->daysPerFile = htonl(10); - pReq->daysToKeep0 = htonl(3650); - pReq->daysToKeep1 = htonl(3650); - pReq->daysToKeep2 = htonl(3650); - pReq->minRows = htonl(100); - pReq->minRows = htonl(4096); - pReq->commitTime = htonl(3600); - pReq->fsyncPeriod = htonl(3000); - pReq->walLevel = 1; - pReq->precision = 0; - pReq->compression = 2; - pReq->replica = 1; - pReq->quorum = 1; - pReq->update = 0; - pReq->cacheLastRow = 0; - pReq->selfIndex = 0; - for (int r = 0; r < pReq->replica; ++r) { - SReplica* pReplica = &pReq->replicas[r]; - pReplica->id = htonl(1); - pReplica->port = htons(9527); + SAlterVnodeReq alterReq = {0}; + alterReq.vgId = 2; + alterReq.dnodeId = 1; + strcpy(alterReq.db, "1.d1"); + alterReq.dbUid = 9527; + alterReq.vgVersion = 2; + alterReq.cacheBlockSize = 16; + alterReq.totalBlocks = 10; + alterReq.daysPerFile = 10; + alterReq.daysToKeep0 = 3650; + alterReq.daysToKeep1 = 3650; + alterReq.daysToKeep2 = 3650; + alterReq.minRows = 100; + alterReq.minRows = 4096; + alterReq.commitTime = 3600; + alterReq.fsyncPeriod = 3000; + alterReq.walLevel = 1; + alterReq.precision = 0; + alterReq.compression = 2; + alterReq.replica = 1; + alterReq.quorum = 1; + alterReq.update = 0; + alterReq.cacheLastRow = 0; + alterReq.selfIndex = 0; + for (int r = 0; r < alterReq.replica; ++r) { + SReplica* pReplica = &alterReq.replicas[r]; + pReplica->id = 1; + pReplica->port = 9527; } + int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &alterReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSCreateVnodeReq(pReq, contLen, &alterReq); + SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_VNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index ea9d509815f4b1598529a0b86a43b3e43fb2090c..85e9a15bd422737ab29f05a431e7f9267cb5d096 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); -SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); -SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); +void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); +void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index ce308e221ff78374b16c7ba3e80064e92cd01907..e772ff326c7dd5f745bb0c39d62e034da99e29bd 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -335,11 +335,12 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SCreateVnodeReq *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup); + int32_t contLen = 0; + void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); if (pReq == NULL) return -1; action.pCont = pReq; - action.contLen = sizeof(SCreateVnodeReq); + action.contLen = contLen; action.msgType = TDMT_DND_CREATE_VNODE; action.acceptableCode = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { @@ -365,8 +366,8 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - int32_t contLen = 0; - SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); + int32_t contLen = 0; + void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); if (pReq == NULL) return -1; action.pCont = pReq; @@ -578,11 +579,12 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SAlterVnodeReq *pReq = (SAlterVnodeReq *)mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup); + int32_t contLen = 0; + void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); if (pReq == NULL) return -1; action.pCont = pReq; - action.contLen = sizeof(SAlterVnodeReq); + action.contLen = contLen; action.msgType = TDMT_DND_ALTER_VNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pReq); @@ -755,8 +757,8 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - int32_t contLen = 0; - SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); + int32_t contLen = 0; + void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); if (pReq == NULL) return -1; action.pCont = pReq; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 997388c82865062145f2a79b426876028768dffa..90d190c1be1dbe4867e2d375f97d6a544bb84a88 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -189,43 +189,37 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { sdbRelease(pSdb, pVgroup); } -SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq)); - if (pCreate == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pCreate->vgId = htonl(pVgroup->vgId); - pCreate->dnodeId = htonl(pDnode->id); - memcpy(pCreate->db, pDb->name, TSDB_DB_FNAME_LEN); - pCreate->dbUid = htobe64(pDb->uid); - pCreate->vgVersion = htonl(pVgroup->version); - pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); - pCreate->totalBlocks = htonl(pDb->cfg.totalBlocks); - pCreate->daysPerFile = htonl(pDb->cfg.daysPerFile); - pCreate->daysToKeep0 = htonl(pDb->cfg.daysToKeep0); - pCreate->daysToKeep1 = htonl(pDb->cfg.daysToKeep1); - pCreate->daysToKeep2 = htonl(pDb->cfg.daysToKeep2); - pCreate->minRows = htonl(pDb->cfg.minRows); - pCreate->maxRows = htonl(pDb->cfg.maxRows); - pCreate->commitTime = htonl(pDb->cfg.commitTime); - pCreate->fsyncPeriod = htonl(pDb->cfg.fsyncPeriod); - pCreate->walLevel = pDb->cfg.walLevel; - pCreate->precision = pDb->cfg.precision; - pCreate->compression = pDb->cfg.compression; - pCreate->quorum = pDb->cfg.quorum; - pCreate->update = pDb->cfg.update; - pCreate->cacheLastRow = pDb->cfg.cacheLastRow; - pCreate->replica = pVgroup->replica; - pCreate->selfIndex = -1; +void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { + SCreateVnodeReq createReq = {0}; + createReq.vgId = pVgroup->vgId; + createReq.dnodeId = pDnode->id; + memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN); + createReq.dbUid = pDb->uid; + createReq.vgVersion = pVgroup->version; + createReq.cacheBlockSize = pDb->cfg.cacheBlockSize; + createReq.totalBlocks = pDb->cfg.totalBlocks; + createReq.daysPerFile = pDb->cfg.daysPerFile; + createReq.daysToKeep0 = pDb->cfg.daysToKeep0; + createReq.daysToKeep1 = pDb->cfg.daysToKeep1; + createReq.daysToKeep2 = pDb->cfg.daysToKeep2; + createReq.minRows = pDb->cfg.minRows; + createReq.maxRows = pDb->cfg.maxRows; + createReq.commitTime = pDb->cfg.commitTime; + createReq.fsyncPeriod = pDb->cfg.fsyncPeriod; + createReq.walLevel = pDb->cfg.walLevel; + createReq.precision = pDb->cfg.precision; + createReq.compression = pDb->cfg.compression; + createReq.quorum = pDb->cfg.quorum; + createReq.update = pDb->cfg.update; + createReq.cacheLastRow = pDb->cfg.cacheLastRow; + createReq.replica = pVgroup->replica; + createReq.selfIndex = -1; for (int32_t v = 0; v < pVgroup->replica; ++v) { - SReplica *pReplica = &pCreate->replicas[v]; + SReplica *pReplica = &createReq.replicas[v]; SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pVgidDnode == NULL) { - free(pCreate); return NULL; } @@ -235,20 +229,33 @@ SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbOb mndReleaseDnode(pMnode, pVgidDnode); if (pDnode->id == pVgid->dnodeId) { - pCreate->selfIndex = v; + createReq.selfIndex = v; } } - if (pCreate->selfIndex == -1) { - free(pCreate); + if (createReq.selfIndex == -1) { terrno = TSDB_CODE_MND_APP_ERROR; return NULL; } - return pCreate; + int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *pReq = malloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tSerializeSCreateVnodeReq(pReq, contLen, &createReq); + *pContLen = contLen; + return pReq; } -SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, +void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { SDropVnodeReq dropReq = {0}; dropReq.dnodeId = pDnode->id;