提交 0eb7890d 编写于 作者: H Hongze Cheng

add drop table msg

上级 41162f28
......@@ -1524,6 +1524,7 @@ typedef struct {
int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam);
// TDMT_VND_CREATE_STB ==============
typedef struct SVCreateStbReq {
const char* name;
tb_uid_t suid;
......@@ -1536,17 +1537,14 @@ typedef struct SVCreateStbReq {
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq);
// TDMT_VND_DROP_STB ==============
typedef struct SVDropStbReq {
// data
#ifdef WINDOWS
size_t avoidCompilationErrors;
#endif
const char* name;
tb_uid_t suid;
} SVDropStbReq;
typedef struct SVCreateStbRsp {
int code;
} SVCreateStbRsp;
int32_t tEncodeSVDropStbReq(SCoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SCoder* pCoder, SVDropStbReq* pReq);
typedef struct SVCreateTbReq {
tb_uid_t uid;
......@@ -1603,19 +1601,36 @@ int tDecodeSVCreateTbBatchRsp(SCoder* pCoder, SVCreateTbBatchRsp* pRsp);
int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
// TDMT_VND_DROP_TABLE =================
typedef struct {
int64_t ver;
char* name;
uint8_t type;
tb_uid_t suid;
const char* name;
} SVDropTbReq;
typedef struct {
int tmp; // TODO: to avoid compile error
int32_t code;
} SVDropTbRsp;
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq);
typedef struct {
int32_t nReqs;
union {
SVDropTbReq* pReqs;
SArray* pArray;
};
} SVDropTbBatchReq;
int32_t tEncodeSVDropTbBatchReq(SCoder* pCoder, const SVDropTbBatchReq* pReq);
int32_t tDecodeSVDropTbBatchReq(SCoder* pCoder, SVDropTbBatchReq* pReq);
typedef struct {
int32_t nRsps;
union {
SVDropTbRsp* pRsps;
SArray* pArray;
};
} SVDropTbBatchRsp;
int32_t tEncodeSVDropTbBatchRsp(SCoder* pCoder, const SVDropTbBatchRsp* pRsp);
int32_t tDecodeSVDropTbBatchRsp(SCoder* pCoder, SVDropTbBatchRsp* pRsp);
typedef struct {
SMsgHead head;
......
......@@ -170,9 +170,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "vnode-update-tag-val", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateTbReq, SVCreateTbRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
......
......@@ -79,17 +79,6 @@ typedef struct {
#define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos)
#define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE))
#define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE))
// #define TCODER_MALLOC(PCODER, SIZE) \
// ({ \
// void* ptr = NULL; \
// SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(*pMem) + (SIZE)); \
// if (pMem) { \
// pMem->next = (PCODER)->mList; \
// (PCODER)->mList = pMem; \
// ptr = (void*)&pMem[1]; \
// } \
// ptr; \
// })
static FORCE_INLINE void* tCoderMalloc(SCoder* pCoder, int32_t size) {
void* ptr = NULL;
SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(SCoderMem*) + size);
......@@ -102,8 +91,9 @@ static FORCE_INLINE void* tCoderMalloc(SCoder* pCoder, int32_t size) {
}
#define tEncodeSize(E, S, SIZE, RET) \
do{ \
do { \
SCoder coder = {0}; \
RET = 0; \
tCoderInit(&coder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); \
if ((E)(&coder, S) == 0) { \
SIZE = coder.pos; \
......@@ -111,7 +101,7 @@ static FORCE_INLINE void* tCoderMalloc(SCoder* pCoder, int32_t size) {
RET = -1; \
} \
tCoderClear(&coder); \
}while(0)
} while (0)
// #define tEncodeSize(E, S, SIZE) \
// ({ \
// SCoder coder = {0}; \
......
......@@ -490,21 +490,6 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
return 0;
}
int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU8(buf, pReq->type);
return tlen;
}
void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
buf = taosDecodeFixedI64(buf, &pReq->ver);
buf = taosDecodeString(buf, &pReq->name);
buf = taosDecodeFixedU8(buf, &pReq->type);
return buf;
}
int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -3804,6 +3789,120 @@ int tDecodeSVCreateTbRsp(SCoder *pCoder, SVCreateTbRsp *pRsp) {
if (tDecodeI32(pCoder, &pRsp->code) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
// TDMT_VND_DROP_TABLE =================
static int32_t tEncodeSVDropTbReq(SCoder *pCoder, const SVDropTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static int32_t tDecodeSVDropTbReq(SCoder *pCoder, SVDropTbReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
static int32_t tEncodeSVDropTbRsp(SCoder *pCoder, const SVDropTbRsp *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
static int32_t tDecodeSVDropTbRsp(SCoder *pCoder, SVDropTbRsp *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeSVDropTbBatchReq(SCoder *pCoder, const SVDropTbBatchReq *pReq) {
int32_t nReqs = taosArrayGetSize(pReq->pArray);
SVDropTbReq *pDropTbReq;
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, nReqs) < 0) return -1;
for (int iReq = 0; iReq < nReqs; iReq++) {
pDropTbReq = (SVDropTbReq *)taosArrayGet(pReq->pArray, iReq);
if (tEncodeSVDropTbReq(pCoder, pDropTbReq) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSVDropTbBatchReq(SCoder *pCoder, SVDropTbBatchReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->nReqs) < 0) return -1;
pReq->pReqs = (SVDropTbReq *)tCoderMalloc(pCoder, sizeof(SVDropTbReq) * pReq->nReqs);
if (pReq->pReqs == NULL) return -1;
for (int iReq = 0; iReq < pReq->nReqs; iReq++) {
if (tDecodeSVDropTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1;
}
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeSVDropTbBatchRsp(SCoder *pCoder, const SVDropTbBatchRsp *pRsp) {
int32_t nRsps = taosArrayGetSize(pRsp->pArray);
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, nRsps) < 0) return -1;
for (int iRsp = 0; iRsp < nRsps; iRsp++) {
if (tEncodeSVDropTbRsp(pCoder, (SVDropTbRsp *)taosArrayGet(pRsp->pArray, iRsp)) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSVDropTbBatchRsp(SCoder *pCoder, SVDropTbBatchRsp *pRsp) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pRsp->nRsps) < 0) return -1;
pRsp->pRsps = (SVDropTbRsp *)tCoderMalloc(pCoder, sizeof(SVDropTbRsp) * pRsp->nRsps);
if (pRsp->pRsps == NULL) return -1;
for (int iRsp = 0; iRsp < pRsp->nRsps; iRsp++) {
if (tDecodeSVDropTbRsp(pCoder, pRsp->pRsps + iRsp) < 0) return -1;
}
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeSVDropStbReq(SCoder *pCoder, const SVDropStbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSVDropStbReq(SCoder *pCoder, SVDropStbReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
tEndDecode(pCoder);
return 0;
}
\ No newline at end of file
......@@ -428,17 +428,23 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
}
static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SName name = {0};
SName name = {0};
SVDropStbReq req = {0};
int32_t contLen = 0;
int32_t ret = 0;
SMsgHead *pHead = NULL;
SCoder coder = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVDropTbReq req = {0};
req.ver = 0;
req.name = (char *)tNameGetTableName(&name);
req.type = TD_SUPER_TABLE;
req.suid = pStb->uid;
int32_t contLen = tSerializeSVDropTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen);
tEncodeSize(tEncodeSVDropStbReq, &req, contLen, ret);
if (ret < 0) return NULL;
contLen += sizeof(SMsgHead);
pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -448,7 +454,10 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSVDropTbReq(&pBuf, &req);
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER);
tEncodeSVDropStbReq(&coder, &req);
tCoderClear(&coder);
*pContLen = contLen;
return pHead;
......
......@@ -17,10 +17,10 @@
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
......@@ -76,7 +76,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
if (vnodeProcessAlterStbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_DROP_STB:
if (vnodeProcessDropStbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_CREATE_TABLE:
if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
......@@ -85,7 +85,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_DROP_TABLE:
if (vnodeProcessDropTbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
......@@ -408,9 +408,8 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpc
return 0;
}
static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
// TODO
// ASSERT(0);
return 0;
}
......@@ -420,9 +419,15 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcM
return 0;
}
static int vnodeProcessDropTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
// TODO
ASSERT(0);
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVDropTbReq req = {0};
SVDropTbReq rsp = {0};
// decode req
// process req
// return rsp
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册