提交 411b9a2d 编写于 作者: S Shengliang Guan

add tsma

上级 20017d82
...@@ -1897,6 +1897,35 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) ...@@ -1897,6 +1897,35 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
} }
return buf; return buf;
} }
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
int8_t intervalUnit;
int8_t slidingUnit;
int8_t timezone;
int64_t interval;
int64_t offset;
int64_t sliding;
int32_t exprLen;
int32_t tagsFilterLen;
char* expr;
char* tagsFilter;
} SMCreateTSmaReq;
int32_t tSerializeSMCreateTSmaReq(void* buf, int32_t bufLen, SMCreateTSmaReq* pReq);
int32_t tDeserializeSMCreateTSmaReq(void* buf, int32_t bufLen, SMCreateTSmaReq* pReq);
void tFreeSMCreateTSmaReq(SMCreateTSmaReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
} SMDropTSmaReq;
int32_t tSerializeSMDropTSmaReq(void* buf, int32_t bufLen, SMDropTSmaReq* pReq);
int32_t tDeserializeSMDropTSmaReq(void* buf, int32_t bufLen, SMDropTSmaReq* pReq);
typedef struct { typedef struct {
int8_t version; // for compatibility(default 0) int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
......
...@@ -127,6 +127,8 @@ enum { ...@@ -127,6 +127,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "mnode-create-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "mnode-drop-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "mnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "mnode-table-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "mnode-qnode-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "mnode-qnode-list", NULL, NULL)
......
...@@ -589,6 +589,99 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) { ...@@ -589,6 +589,99 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) {
pReq->pFields = NULL; pReq->pFields = NULL;
} }
int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateTSmaReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->stb) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI8(&encoder, pReq->intervalUnit) < 0) return -1;
if (tEncodeI8(&encoder, pReq->slidingUnit) < 0) return -1;
if (tEncodeI8(&encoder, pReq->timezone) < 0) return -1;
if (tEncodeI64(&encoder, pReq->interval) < 0) return -1;
if (tEncodeI64(&encoder, pReq->offset) < 0) return -1;
if (tEncodeI64(&encoder, pReq->sliding) < 0) return -1;
if (tEncodeI32(&encoder, pReq->exprLen) < 0) return -1;
if (pReq->exprLen > 0) {
if (tEncodeBinary(&encoder, pReq->expr, pReq->exprLen) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->tagsFilterLen) < 0) return -1;
if (pReq->tagsFilterLen > 0) {
if (tEncodeBinary(&encoder, pReq->tagsFilter, pReq->tagsFilterLen) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateTSmaReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->stb) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->intervalUnit) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->slidingUnit) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->timezone) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->interval) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->offset) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->sliding) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->exprLen) < 0) return -1;
if (pReq->exprLen > 0) {
pReq->expr = malloc(pReq->exprLen);
if (pReq->expr == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->expr) < 0) return -1;
}
if (tDecodeI32(&decoder, &pReq->tagsFilterLen) < 0) return -1;
if (pReq->tagsFilterLen > 0) {
pReq->tagsFilter = malloc(pReq->tagsFilterLen);
if (pReq->tagsFilter == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->tagsFilter) < 0) return -1;
}
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
void tFreeSMCreateSmaReq(SMCreateTSmaReq *pReq) {
tfree(pReq->expr);
tfree(pReq->tagsFilter);
}
int32_t tSerializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropTSmaReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropTSmaReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......
...@@ -306,20 +306,38 @@ typedef struct { ...@@ -306,20 +306,38 @@ typedef struct {
} SVgObj; } SVgObj;
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN]; int64_t createdTime;
int64_t createdTime; int64_t uid;
int64_t updateTime; int8_t intervalUnit;
int64_t uid; int8_t slidingUnit;
int64_t dbUid; int8_t timezone;
int32_t version; int64_t interval;
int32_t nextColId; int64_t offset;
int32_t numOfColumns; int64_t sliding;
int32_t numOfTags; int32_t exprLen;
SSchema* pColumns; int32_t tagsFilterLen;
SSchema* pTags; char* expr;
SRWLatch lock; char* tagsFilter;
char comment[TSDB_STB_COMMENT_LEN]; } STSmaObj;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createdTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
int32_t nextColId;
int32_t numOfColumns;
int32_t numOfTags;
int32_t numOfTSmas;
SSchema* pColumns;
SSchema* pTags;
STSmaObj* pTSmas;
SRWLatch lock;
char comment[TSDB_STB_COMMENT_LEN];
} SStbObj; } SStbObj;
typedef struct { typedef struct {
......
...@@ -18,15 +18,15 @@ ...@@ -18,15 +18,15 @@
#include "mndAuth.h" #include "mndAuth.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndInfoSchema.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "mndInfoSchema.h"
#include "tname.h" #include "tname.h"
#define TSDB_STB_VER_NUMBER 1 #define TSDB_STB_VER_NUMBER 1
#define TSDB_STB_RESERVE_SIZE 64 #define TSDB_STB_RESERVE_SIZE 64
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
...@@ -88,6 +88,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -88,6 +88,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTSmas, STB_ENCODE_OVER)
for (int32_t i = 0; i < pStb->numOfColumns; ++i) { for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i]; SSchema *pSchema = &pStb->pColumns[i];
...@@ -105,6 +106,23 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -105,6 +106,23 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
} }
for (int32_t i = 0; i < pStb->numOfTSmas; ++i) {
STSmaObj *pTSma = &pStb->pTSmas[i];
SDB_SET_BINARY(pRaw, dataPos, pTSma->name, TSDB_TABLE_FNAME_LEN, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTSma->createdTime, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTSma->uid, STB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pTSma->intervalUnit, STB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pTSma->slidingUnit, STB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pTSma->timezone, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTSma->interval, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTSma->offset, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTSma->sliding, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pTSma->exprLen, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pTSma->tagsFilterLen, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pTSma->expr, pTSma->exprLen, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pTSma->tagsFilter, pTSma->tagsFilterLen, STB_ENCODE_OVER)
}
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER)
...@@ -150,6 +168,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -150,6 +168,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTSmas, STB_DECODE_OVER)
pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema)); pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema));
pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema)); pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema));
...@@ -173,6 +192,23 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -173,6 +192,23 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
} }
for (int32_t i = 0; i < pStb->numOfTSmas; ++i) {
STSmaObj *pTSma = &pStb->pTSmas[i];
SDB_GET_BINARY(pRaw, dataPos, pTSma->name, TSDB_TABLE_FNAME_LEN, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTSma->createdTime, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTSma->uid, STB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pTSma->intervalUnit, STB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pTSma->slidingUnit, STB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pTSma->timezone, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTSma->interval, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTSma->offset, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTSma->sliding, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pTSma->exprLen, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pTSma->tagsFilterLen, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pTSma->expr, pTSma->exprLen, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pTSma->tagsFilter, pTSma->tagsFilterLen, STB_DECODE_OVER)
}
SDB_GET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_DECODE_OVER)
...@@ -1162,7 +1198,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -1162,7 +1198,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndDropStb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) { static int32_t mndDropStb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_STB, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_STB_OVER; if (pTrans == NULL) goto DROP_STB_OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册