From 411b9a2d33b9d27d2a6da3d9573a1c77fbdd0099 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 23 Mar 2022 13:50:53 +0800 Subject: [PATCH] add tsma --- include/common/tmsg.h | 29 +++++++++ include/common/tmsgdef.h | 2 + source/common/src/tmsg.c | 93 ++++++++++++++++++++++++++++ source/dnode/mnode/impl/inc/mndDef.h | 46 +++++++++----- source/dnode/mnode/impl/src/mndStb.c | 42 ++++++++++++- 5 files changed, 195 insertions(+), 17 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ca4e6b97a4..33e5431333 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1897,6 +1897,35 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) } return buf; } + +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + char stb[TSDB_TABLE_FNAME_LEN]; + int8_t igExists; + int8_t intervalUnit; + int8_t slidingUnit; + int8_t timezone; + int64_t interval; + int64_t offset; + int64_t sliding; + int32_t exprLen; + int32_t tagsFilterLen; + char* expr; + char* tagsFilter; +} SMCreateTSmaReq; + +int32_t tSerializeSMCreateTSmaReq(void* buf, int32_t bufLen, SMCreateTSmaReq* pReq); +int32_t tDeserializeSMCreateTSmaReq(void* buf, int32_t bufLen, SMCreateTSmaReq* pReq); +void tFreeSMCreateTSmaReq(SMCreateTSmaReq* pReq); + +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + int8_t igNotExists; +} SMDropTSmaReq; + +int32_t tSerializeSMDropTSmaReq(void* buf, int32_t bufLen, SMDropTSmaReq* pReq); +int32_t tDeserializeSMDropTSmaReq(void* buf, int32_t bufLen, SMDropTSmaReq* pReq); + typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 73a78131dc..c707ec847f 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -127,6 +127,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "mnode-create-tsma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "mnode-drop-tsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "mnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "mnode-qnode-list", NULL, NULL) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b4caf5ba97..ece2332b64 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -589,6 +589,99 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) { pReq->pFields = NULL; } +int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateTSmaReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->stb) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; + if (tEncodeI8(&encoder, pReq->intervalUnit) < 0) return -1; + if (tEncodeI8(&encoder, pReq->slidingUnit) < 0) return -1; + if (tEncodeI8(&encoder, pReq->timezone) < 0) return -1; + if (tEncodeI64(&encoder, pReq->interval) < 0) return -1; + if (tEncodeI64(&encoder, pReq->offset) < 0) return -1; + if (tEncodeI64(&encoder, pReq->sliding) < 0) return -1; + if (tEncodeI32(&encoder, pReq->exprLen) < 0) return -1; + if (pReq->exprLen > 0) { + if (tEncodeBinary(&encoder, pReq->expr, pReq->exprLen) < 0) return -1; + } + if (tEncodeI32(&encoder, pReq->tagsFilterLen) < 0) return -1; + if (pReq->tagsFilterLen > 0) { + if (tEncodeBinary(&encoder, pReq->tagsFilter, pReq->tagsFilterLen) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateTSmaReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->stb) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->intervalUnit) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->slidingUnit) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->timezone) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->interval) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->offset) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->sliding) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->exprLen) < 0) return -1; + if (pReq->exprLen > 0) { + pReq->expr = malloc(pReq->exprLen); + if (pReq->expr == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->expr) < 0) return -1; + } + if (tDecodeI32(&decoder, &pReq->tagsFilterLen) < 0) return -1; + if (pReq->tagsFilterLen > 0) { + pReq->tagsFilter = malloc(pReq->tagsFilterLen); + if (pReq->tagsFilter == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->tagsFilter) < 0) return -1; + } + + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; +} + +void tFreeSMCreateSmaReq(SMCreateTSmaReq *pReq) { + tfree(pReq->expr); + tfree(pReq->tagsFilter); +} + +int32_t tSerializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropTSmaReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropTSmaReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 909486aaac..79c110af94 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -306,20 +306,38 @@ typedef struct { } SVgObj; typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createdTime; - int64_t updateTime; - int64_t uid; - int64_t dbUid; - int32_t version; - int32_t nextColId; - int32_t numOfColumns; - int32_t numOfTags; - SSchema* pColumns; - SSchema* pTags; - SRWLatch lock; - char comment[TSDB_STB_COMMENT_LEN]; + char name[TSDB_TABLE_FNAME_LEN]; + int64_t createdTime; + int64_t uid; + int8_t intervalUnit; + int8_t slidingUnit; + int8_t timezone; + int64_t interval; + int64_t offset; + int64_t sliding; + int32_t exprLen; + int32_t tagsFilterLen; + char* expr; + char* tagsFilter; +} STSmaObj; + +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createdTime; + int64_t updateTime; + int64_t uid; + int64_t dbUid; + int32_t version; + int32_t nextColId; + int32_t numOfColumns; + int32_t numOfTags; + int32_t numOfTSmas; + SSchema* pColumns; + SSchema* pTags; + STSmaObj* pTSmas; + SRWLatch lock; + char comment[TSDB_STB_COMMENT_LEN]; } SStbObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 121720f48f..8ad3135ab0 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -18,15 +18,15 @@ #include "mndAuth.h" #include "mndDb.h" #include "mndDnode.h" +#include "mndInfoSchema.h" #include "mndMnode.h" #include "mndShow.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" -#include "mndInfoSchema.h" #include "tname.h" -#define TSDB_STB_VER_NUMBER 1 +#define TSDB_STB_VER_NUMBER 1 #define TSDB_STB_RESERVE_SIZE 64 static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); @@ -88,6 +88,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->numOfTSmas, STB_ENCODE_OVER) for (int32_t i = 0; i < pStb->numOfColumns; ++i) { SSchema *pSchema = &pStb->pColumns[i]; @@ -105,6 +106,23 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) } + for (int32_t i = 0; i < pStb->numOfTSmas; ++i) { + STSmaObj *pTSma = &pStb->pTSmas[i]; + SDB_SET_BINARY(pRaw, dataPos, pTSma->name, TSDB_TABLE_FNAME_LEN, STB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTSma->createdTime, STB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTSma->uid, STB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pTSma->intervalUnit, STB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pTSma->slidingUnit, STB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pTSma->timezone, STB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTSma->interval, STB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTSma->offset, STB_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTSma->sliding, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pTSma->exprLen, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pTSma->tagsFilterLen, STB_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pTSma->expr, pTSma->exprLen, STB_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pTSma->tagsFilter, pTSma->tagsFilterLen, STB_ENCODE_OVER) + } + SDB_SET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER) @@ -150,6 +168,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTSmas, STB_DECODE_OVER) pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema)); pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema)); @@ -173,6 +192,23 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) } + for (int32_t i = 0; i < pStb->numOfTSmas; ++i) { + STSmaObj *pTSma = &pStb->pTSmas[i]; + SDB_GET_BINARY(pRaw, dataPos, pTSma->name, TSDB_TABLE_FNAME_LEN, STB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTSma->createdTime, STB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTSma->uid, STB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pTSma->intervalUnit, STB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pTSma->slidingUnit, STB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &pTSma->timezone, STB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTSma->interval, STB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTSma->offset, STB_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTSma->sliding, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pTSma->exprLen, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pTSma->tagsFilterLen, STB_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pTSma->expr, pTSma->exprLen, STB_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pTSma->tagsFilter, pTSma->tagsFilterLen, STB_DECODE_OVER) + } + SDB_GET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_DECODE_OVER) @@ -1162,7 +1198,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * static int32_t mndDropStb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_STB, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_STB_OVER; mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); -- GitLab