diff --git a/example/src/tstream.c b/example/src/tstream.c index 5bd833213d7706b1151f3b565d4b7e567f62c655..65fd005954353222d50a1a55b855485380dd8e14 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -81,10 +81,10 @@ int32_t create_stream() { /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ - pRes = - taos_query(pConn, - "create stream stream1 trigger window_close as select _wstartts, min(k), max(k), sum(k) as sum_of_k " - "from tu1 interval(10m)"); + pRes = taos_query( + pConn, + "create stream stream1 trigger window_close into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k " + "from tu1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 89638524ae8622930eee95bbf4f01fa2778a9c6b..9ae7922f6b56b7ab3b0ed82b73289974c8b37897 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -72,7 +72,7 @@ typedef enum { TRN_TYPE_DROP_USER = 1003, TRN_TYPE_CREATE_FUNC = 1004, TRN_TYPE_DROP_FUNC = 1005, - + TRN_TYPE_CREATE_SNODE = 1006, TRN_TYPE_DROP_SNODE = 1007, TRN_TYPE_CREATE_QNODE = 1008, @@ -601,7 +601,6 @@ typedef struct { int32_t triggerParam; int64_t waterMark; char* sql; - char* logicalPlan; char* physicalPlan; SArray* tasks; // SArray> SSchemaWrapper outputSchema; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 0ec4332d37150c3dcd45644479497d7bf1dc6da0..02a8eaa832e40c556069a5f1439e91155fb81af6 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -419,7 +419,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre streamObj.fixedSinkVgId = smaObj.dstVgId; streamObj.smaId = smaObj.uid; /*streamObj.physicalPlan = "";*/ - streamObj.logicalPlan = "not implemented"; int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1404b1cd94714fff00370e8513eb9560d980de4b..aa7f16e10d116a8ff404280056cd66fabc6f180a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -290,7 +290,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return 0; } -static SStbObj *mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { +static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; SDbObj *pDb = NULL; SUserObj *pUser = NULL; @@ -301,7 +301,22 @@ static SStbObj *mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStr createReq.numOfTags = 1; // group id createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField)); // build fields + taosArraySetSize(createReq.pColumns, createReq.numOfColumns); + for (int32_t i = 0; i < createReq.numOfColumns; i++) { + SField *pField = taosArrayGet(createReq.pColumns, i); + tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN); + pField->flags = pStream->outputSchema.pSchema[i].flags; + pField->type = pStream->outputSchema.pSchema[i].type; + pField->bytes = pStream->outputSchema.pSchema[i].bytes; + } + createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField)); + taosArraySetSize(createReq.pTags, 1); // build tags + SField *pField = taosArrayGet(createReq.pTags, 0); + strcpy(pField->name, "group_id"); + pField->type = TSDB_DATA_TYPE_UBIGINT; + pField->flags = 0; + pField->bytes = 8; if (mndCheckCreateStbReq(&createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -342,18 +357,14 @@ static SStbObj *mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStr goto _OVER; } - if (mndBuildStbFromReq(pMnode, pStb, &createReq, pDb) != 0) { - goto _OVER; - } - if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; - return pStb; + return 0; _OVER: mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); mndReleaseUser(pMnode, pUser); - return NULL; + return -1; } static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { @@ -373,7 +384,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.fixedSinkVgId = 0; streamObj.smaId = 0; /*streamObj.physicalPlan = "";*/ - streamObj.logicalPlan = "not implemented"; streamObj.trigger = pCreate->triggerType; streamObj.waterMark = pCreate->watermark; @@ -384,16 +394,14 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); -#if 0 - if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->user) < 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } -#endif - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) { - mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); + if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->user) < 0) { + mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 70a3559cd9f92d6b48d5c6aee68cd72eb05b4424..4fe07029f1ac14a29eee3225357324c56d1f9df3 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -49,7 +49,10 @@ void walCloseReadHandle(SWalReadHandle *pRead) { taosMemoryFree(pRead); } -int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { return 0; } +int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { + // TODO + return 0; +} static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) { int code = 0;