未验证 提交 a80f7d74 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #10959 from taosdata/feature/shm

sma
...@@ -269,12 +269,15 @@ typedef struct SSchema { ...@@ -269,12 +269,15 @@ typedef struct SSchema {
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists; int8_t igExists;
float xFilesFactor;
int32_t aggregationMethod;
int32_t delay;
int32_t numOfColumns; int32_t numOfColumns;
int32_t numOfTags; int32_t numOfTags;
int32_t commentLen; int32_t commentLen;
SArray* pColumns; SArray* pColumns;
SArray* pTags; SArray* pTags;
char *comment; char* comment;
} SMCreateStbReq; } SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
......
...@@ -508,6 +508,9 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq ...@@ -508,6 +508,9 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeFloat(&encoder, pReq->xFilesFactor) < 0) return -1;
if (tEncodeI32(&encoder, pReq->aggregationMethod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->delay) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1; if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1;
...@@ -541,6 +544,9 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR ...@@ -541,6 +544,9 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeFloat(&decoder, &pReq->xFilesFactor) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->aggregationMethod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->delay) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1; if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1;
......
...@@ -341,6 +341,9 @@ typedef struct { ...@@ -341,6 +341,9 @@ typedef struct {
int64_t dbUid; int64_t dbUid;
int32_t version; int32_t version;
int32_t nextColId; int32_t nextColId;
float xFilesFactor;
int32_t aggregationMethod;
int32_t delay;
int32_t numOfColumns; int32_t numOfColumns;
int32_t numOfTags; int32_t numOfTags;
int32_t commentLen; int32_t commentLen;
......
...@@ -31,6 +31,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); ...@@ -31,6 +31,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.c" #include "mndStb.c"
#include "mndStream.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
...@@ -404,6 +405,18 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre ...@@ -404,6 +405,18 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen); memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
} }
SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.dbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
/*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented";
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg);
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
...@@ -414,6 +427,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre ...@@ -414,6 +427,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
...@@ -457,6 +471,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { ...@@ -457,6 +471,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
int32_t code = -1; int32_t code = -1;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SSmaObj *pSma = NULL; SSmaObj *pSma = NULL;
SStreamObj *pStream = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
SMCreateSmaReq createReq = {0}; SMCreateSmaReq createReq = {0};
...@@ -476,6 +491,12 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { ...@@ -476,6 +491,12 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
goto _OVER; goto _OVER;
} }
pStream = mndAcquireStream(pMnode, createReq.name);
if (pStream != NULL) {
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name);
goto _OVER;
}
pSma = mndAcquireSma(pMnode, createReq.name); pSma = mndAcquireSma(pMnode, createReq.name);
if (pSma != NULL) { if (pSma != NULL) {
...@@ -514,6 +535,7 @@ _OVER: ...@@ -514,6 +535,7 @@ _OVER:
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
mndReleaseSma(pMnode, pSma); mndReleaseSma(pMnode, pSma);
mndReleaseStream(pMnode, pStream);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
tFreeSMCreateSmaReq(&createReq); tFreeSMCreateSmaReq(&createReq);
......
...@@ -85,6 +85,9 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -85,6 +85,9 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, STB_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->version, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->version, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->aggregationMethod, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->delay, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, STB_ENCODE_OVER)
...@@ -148,6 +151,11 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -148,6 +151,11 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, STB_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->version, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->version, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER)
int32_t xFilesFactor = 0;
SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, STB_DECODE_OVER)
pStb->xFilesFactor = xFilesFactor / 10000.0f;
SDB_GET_INT32(pRaw, dataPos, &pStb->aggregationMethod, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->delay, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, STB_DECODE_OVER)
......
...@@ -240,13 +240,13 @@ static SArray *mndExtractNamesFromAst(const SNode *pAst) { ...@@ -240,13 +240,13 @@ static SArray *mndExtractNamesFromAst(const SNode *pAst) {
return names; return names;
} }
static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) { static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
if (NULL == pCreate->ast) { if (NULL == ast) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SNode *pAst = NULL; SNode *pAst = NULL;
int32_t code = nodesStringToNode(pCreate->ast, &pAst); int32_t code = nodesStringToNode(ast, &pAst);
SQueryPlan *pPlan = NULL; SQueryPlan *pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -267,22 +267,9 @@ static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char ** ...@@ -267,22 +267,9 @@ static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **
return code; return code;
} }
static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
mDebug("stream:%s to create", pCreate->name);
SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.dbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
/*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented";
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {
return -1; return -1;
} }
SArray *names = mndExtractNamesFromAst(pAst); SArray *names = mndExtractNamesFromAst(pAst);
...@@ -292,33 +279,55 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -292,33 +279,55 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
} }
printf("\n=======================================================\n"); printf("\n=======================================================\n");
streamObj.outputName = names; pStream->outputName = names;
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); mError("topic:%s, failed to get plan since %s", pStream->name, terrstr());
return -1; return -1;
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (mndScheduleStream(pMnode, pTrans, pStream) < 0) {
if (pTrans == NULL) { mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
return -1; return -1;
} }
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { SSdbRaw *pRedoRaw = mndStreamActionEncode(pStream);
mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr()); if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj); return 0;
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { }
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
mDebug("stream:%s to create", pCreate->name);
SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.dbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
/*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented";
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) {
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......
...@@ -156,7 +156,7 @@ void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, in ...@@ -156,7 +156,7 @@ void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, in
createReq.tagsFilterLen = strlen(createReq.tagsFilter) + 1; createReq.tagsFilterLen = strlen(createReq.tagsFilter) + 1;
createReq.sql = (char*)sql; createReq.sql = (char*)sql;
createReq.sqlLen = strlen(createReq.sql) + 1; createReq.sqlLen = strlen(createReq.sql) + 1;
createReq.ast = (char*)expr; createReq.ast = (char*)ast;
createReq.astLen = strlen(createReq.ast) + 1; createReq.astLen = strlen(createReq.ast) + 1;
int32_t tlen = tSerializeSMCreateSmaReq(NULL, 0, &createReq); int32_t tlen = tSerializeSMCreateSmaReq(NULL, 0, &createReq);
...@@ -201,7 +201,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -201,7 +201,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
test.SendShowRetrieveReq(); test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
} }
#if 0
{ {
pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen); pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen);
pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen); pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen);
...@@ -233,4 +233,5 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -233,4 +233,5 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
test.SendShowRetrieveReq(); test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 0); EXPECT_EQ(test.GetShowRows(), 0);
} }
#endif
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册