提交 4734795f 编写于 作者: 5 54liuyao

feat(stream):stream insert data into an existing table

上级 fb628f7f
...@@ -1751,6 +1751,8 @@ typedef struct { ...@@ -1751,6 +1751,8 @@ typedef struct {
#define STREAM_FILL_HISTORY_ON 1 #define STREAM_FILL_HISTORY_ON 1
#define STREAM_FILL_HISTORY_OFF 0 #define STREAM_FILL_HISTORY_OFF 0
#define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF #define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF
#define STREAM_CREATE_STABLE_TRUE 1
#define STREAM_CREATE_STABLE_FALSE 0
typedef struct { typedef struct {
char name[TSDB_STREAM_FNAME_LEN]; char name[TSDB_STREAM_FNAME_LEN];
...@@ -1768,6 +1770,8 @@ typedef struct { ...@@ -1768,6 +1770,8 @@ typedef struct {
SArray* pTags; // array of SField SArray* pTags; // array of SField
// 3.0.20 // 3.0.20
int64_t checkpointFreq; // ms int64_t checkpointFreq; // ms
// 3.0.2.3
int8_t createStb;
} SCMCreateStreamReq; } SCMCreateStreamReq;
typedef struct { typedef struct {
......
...@@ -5424,6 +5424,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS ...@@ -5424,6 +5424,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -5484,6 +5485,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea ...@@ -5484,6 +5485,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
} }
} }
} }
if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
......
...@@ -637,7 +637,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -637,7 +637,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
// create stb for stream // create stb for stream
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr()); mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
......
...@@ -5766,6 +5766,8 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* ...@@ -5766,6 +5766,8 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->numOfTags = LIST_LENGTH(pStmt->pTags); pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
} }
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册