diff --git a/include/common/tmsg.h b/include/common/tmsg.h index da48846a8f24c63187d9dde1f379a2bf26eadd09..670fe6415d589efda5745a67b5bb78e3ab93b3d1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1524,6 +1524,7 @@ typedef struct { int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam); int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam); +// TDMT_VND_CREATE_STB ============== typedef struct SVCreateStbReq { const char* name; tb_uid_t suid; @@ -1536,17 +1537,14 @@ typedef struct SVCreateStbReq { int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq); int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq); +// TDMT_VND_DROP_STB ============== typedef struct SVDropStbReq { - // data -#ifdef WINDOWS - size_t avoidCompilationErrors; -#endif - + const char* name; + tb_uid_t suid; } SVDropStbReq; -typedef struct SVCreateStbRsp { - int code; -} SVCreateStbRsp; +int32_t tEncodeSVDropStbReq(SCoder* pCoder, const SVDropStbReq* pReq); +int32_t tDecodeSVDropStbReq(SCoder* pCoder, SVDropStbReq* pReq); typedef struct SVCreateTbReq { tb_uid_t uid; @@ -1603,19 +1601,36 @@ int tDecodeSVCreateTbBatchRsp(SCoder* pCoder, SVCreateTbBatchRsp* pRsp); int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); +// TDMT_VND_DROP_TABLE ================= typedef struct { - int64_t ver; - char* name; - uint8_t type; - tb_uid_t suid; + const char* name; } SVDropTbReq; typedef struct { - int tmp; // TODO: to avoid compile error + int32_t code; } SVDropTbRsp; -int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq); -void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq); +typedef struct { + int32_t nReqs; + union { + SVDropTbReq* pReqs; + SArray* pArray; + }; +} SVDropTbBatchReq; + +int32_t tEncodeSVDropTbBatchReq(SCoder* pCoder, const SVDropTbBatchReq* pReq); +int32_t tDecodeSVDropTbBatchReq(SCoder* pCoder, SVDropTbBatchReq* pReq); + +typedef struct { + int32_t nRsps; + union { + SVDropTbRsp* pRsps; + SArray* pArray; + }; +} SVDropTbBatchRsp; + +int32_t tEncodeSVDropTbBatchRsp(SCoder* pCoder, const SVDropTbBatchRsp* pRsp); +int32_t tDecodeSVDropTbBatchRsp(SCoder* pCoder, SVDropTbBatchRsp* pRsp); typedef struct { SMsgHead head; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index e1ee946ce4e0678d35a809304429a5d5d7d57b90..b0439c722cb187da01ea5e14f451a3d11e1d17d1 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -170,9 +170,9 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "vnode-update-tag-val", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateTbReq, SVCreateTbRsp) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) diff --git a/include/util/tencode.h b/include/util/tencode.h index c5066996f3ffb7091f3bf396b625c6ba50ce0ec0..b081d1a157369ad244fed870b126901f1009374a 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -79,17 +79,6 @@ typedef struct { #define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos) #define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE)) #define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE)) -// #define TCODER_MALLOC(PCODER, SIZE) \ -// ({ \ -// void* ptr = NULL; \ -// SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(*pMem) + (SIZE)); \ -// if (pMem) { \ -// pMem->next = (PCODER)->mList; \ -// (PCODER)->mList = pMem; \ -// ptr = (void*)&pMem[1]; \ -// } \ -// ptr; \ -// }) static FORCE_INLINE void* tCoderMalloc(SCoder* pCoder, int32_t size) { void* ptr = NULL; SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(SCoderMem*) + size); @@ -102,8 +91,9 @@ static FORCE_INLINE void* tCoderMalloc(SCoder* pCoder, int32_t size) { } #define tEncodeSize(E, S, SIZE, RET) \ - do{ \ + do { \ SCoder coder = {0}; \ + RET = 0; \ tCoderInit(&coder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); \ if ((E)(&coder, S) == 0) { \ SIZE = coder.pos; \ @@ -111,7 +101,7 @@ static FORCE_INLINE void* tCoderMalloc(SCoder* pCoder, int32_t size) { RET = -1; \ } \ tCoderClear(&coder); \ - }while(0) + } while (0) // #define tEncodeSize(E, S, SIZE) \ // ({ \ // SCoder coder = {0}; \ diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index afab703e773cb19180fbbbe232ecac3a42f510e8..3003baaf56b7a8b24e88a381e38b5062f211c16c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -490,21 +490,6 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR return 0; } -int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->ver); - tlen += taosEncodeString(buf, pReq->name); - tlen += taosEncodeFixedU8(buf, pReq->type); - return tlen; -} - -void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) { - buf = taosDecodeFixedI64(buf, &pReq->ver); - buf = taosDecodeString(buf, &pReq->name); - buf = taosDecodeFixedU8(buf, &pReq->type); - return buf; -} - int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); @@ -3804,6 +3789,120 @@ int tDecodeSVCreateTbRsp(SCoder *pCoder, SVCreateTbRsp *pRsp) { if (tDecodeI32(pCoder, &pRsp->code) < 0) return -1; + tEndDecode(pCoder); + return 0; +} + +// TDMT_VND_DROP_TABLE ================= +static int32_t tEncodeSVDropTbReq(SCoder *pCoder, const SVDropTbReq *pReq) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeCStr(pCoder, pReq->name) < 0) return -1; + + tEndEncode(pCoder); + return 0; +} + +static int32_t tDecodeSVDropTbReq(SCoder *pCoder, SVDropTbReq *pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1; + + tEndDecode(pCoder); + return 0; +} + +static int32_t tEncodeSVDropTbRsp(SCoder *pCoder, const SVDropTbRsp *pReq) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI32(pCoder, pReq->code) < 0) return -1; + + tEndEncode(pCoder); + return 0; +} + +static int32_t tDecodeSVDropTbRsp(SCoder *pCoder, SVDropTbRsp *pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI32(pCoder, &pReq->code) < 0) return -1; + + tEndDecode(pCoder); + return 0; +} + +int32_t tEncodeSVDropTbBatchReq(SCoder *pCoder, const SVDropTbBatchReq *pReq) { + int32_t nReqs = taosArrayGetSize(pReq->pArray); + SVDropTbReq *pDropTbReq; + + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI32v(pCoder, nReqs) < 0) return -1; + for (int iReq = 0; iReq < nReqs; iReq++) { + pDropTbReq = (SVDropTbReq *)taosArrayGet(pReq->pArray, iReq); + if (tEncodeSVDropTbReq(pCoder, pDropTbReq) < 0) return -1; + } + + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeSVDropTbBatchReq(SCoder *pCoder, SVDropTbBatchReq *pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI32v(pCoder, &pReq->nReqs) < 0) return -1; + pReq->pReqs = (SVDropTbReq *)tCoderMalloc(pCoder, sizeof(SVDropTbReq) * pReq->nReqs); + if (pReq->pReqs == NULL) return -1; + for (int iReq = 0; iReq < pReq->nReqs; iReq++) { + if (tDecodeSVDropTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1; + } + + tEndDecode(pCoder); + return 0; +} + +int32_t tEncodeSVDropTbBatchRsp(SCoder *pCoder, const SVDropTbBatchRsp *pRsp) { + int32_t nRsps = taosArrayGetSize(pRsp->pArray); + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI32v(pCoder, nRsps) < 0) return -1; + for (int iRsp = 0; iRsp < nRsps; iRsp++) { + if (tEncodeSVDropTbRsp(pCoder, (SVDropTbRsp *)taosArrayGet(pRsp->pArray, iRsp)) < 0) return -1; + } + + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeSVDropTbBatchRsp(SCoder *pCoder, SVDropTbBatchRsp *pRsp) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI32v(pCoder, &pRsp->nRsps) < 0) return -1; + pRsp->pRsps = (SVDropTbRsp *)tCoderMalloc(pCoder, sizeof(SVDropTbRsp) * pRsp->nRsps); + if (pRsp->pRsps == NULL) return -1; + for (int iRsp = 0; iRsp < pRsp->nRsps; iRsp++) { + if (tDecodeSVDropTbRsp(pCoder, pRsp->pRsps + iRsp) < 0) return -1; + } + + tEndDecode(pCoder); + return 0; +} + +int32_t tEncodeSVDropStbReq(SCoder *pCoder, const SVDropStbReq *pReq) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeCStr(pCoder, pReq->name) < 0) return -1; + if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; + + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeSVDropStbReq(SCoder *pCoder, SVDropStbReq *pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; + tEndDecode(pCoder); return 0; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 41bcef25d1b671b60f1c0e1311e025783e1ac5a2..faaaac21af4a708de58550935c0e827f2e934c83 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -428,17 +428,23 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt } static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { - SName name = {0}; + SName name = {0}; + SVDropStbReq req = {0}; + int32_t contLen = 0; + int32_t ret = 0; + SMsgHead *pHead = NULL; + SCoder coder = {0}; + tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - SVDropTbReq req = {0}; - req.ver = 0; req.name = (char *)tNameGetTableName(&name); - req.type = TD_SUPER_TABLE; req.suid = pStb->uid; - int32_t contLen = tSerializeSVDropTbReq(NULL, &req) + sizeof(SMsgHead); - SMsgHead *pHead = taosMemoryMalloc(contLen); + tEncodeSize(tEncodeSVDropStbReq, &req, contLen, ret); + if (ret < 0) return NULL; + + contLen += sizeof(SMsgHead); + pHead = taosMemoryMalloc(contLen); if (pHead == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -448,7 +454,10 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, pHead->vgId = htonl(pVgroup->vgId); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); - tSerializeSVDropTbReq(&pBuf, &req); + + tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER); + tEncodeSVDropStbReq(&coder, &req); + tCoderClear(&coder); *pContLen = contLen; return pHead; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7f7023fccd676d9ed61341b64f2625cd7bd53238..52013cdbc356a71bf6a3aba0046422c3f8f40fc4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -17,10 +17,10 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); -static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); +static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); -static int vnodeProcessDropTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); +static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { @@ -76,7 +76,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg if (vnodeProcessAlterStbReq(pVnode, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_STB: - if (vnodeProcessDropStbReq(pVnode, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_CREATE_TABLE: if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; @@ -85,7 +85,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_TABLE: - if (vnodeProcessDropTbReq(pVnode, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { @@ -408,9 +408,8 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpc return 0; } -static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { +static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { // TODO - // ASSERT(0); return 0; } @@ -420,9 +419,15 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcM return 0; } -static int vnodeProcessDropTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { - // TODO - ASSERT(0); +static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + SVDropTbReq req = {0}; + SVDropTbReq rsp = {0}; + + // decode req + + // process req + + // return rsp return 0; }