提交 212144a3 编写于 作者: L Liu Jicong

test(stream): window close not work for stb

上级 0f3fc6cf
...@@ -25,19 +25,21 @@ int32_t init_env() { ...@@ -25,19 +25,21 @@ int32_t init_env() {
return -1; return -1;
} }
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
#if 0
pRes = taos_query(pConn, "create database if not exists abc2 vgroups 20"); pRes = taos_query(pConn, "create database if not exists abc2 vgroups 20");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
pRes = taos_query(pConn, "use abc1"); pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
...@@ -88,9 +90,9 @@ int32_t create_stream() { ...@@ -88,9 +90,9 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query( pRes = taos_query(pConn,
pConn, "create stream stream1 trigger window_close into outstb as select _wstartts, sum(k) from st1 "
"create stream stream1 trigger at_once into abc1.outstb as select _wstartts, sum(k) from st1 interval(10m) "); "interval(10s) ");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -183,6 +183,7 @@ SArray *mmGetMsgHandles() { ...@@ -183,6 +183,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -496,6 +496,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -496,6 +496,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// input // input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
// trigger
pTask->triggerParam = pStream->triggerParam;
// sink or dispatch // sink or dispatch
if (hasExtraSink) { if (hasExtraSink) {
mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pTask); mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pTask);
......
...@@ -408,6 +408,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) { ...@@ -408,6 +408,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
// check deletable
code = mndDropSnode(pMnode, pReq, pObj); code = mndDropSnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
......
...@@ -518,7 +518,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq ...@@ -518,7 +518,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
// TODO // TODO
streamObj.fixedSinkVgId = 0; streamObj.fixedSinkVgId = 0;
streamObj.smaId = 0; streamObj.smaId = 0;
/*streamObj.physicalPlan = "";*/
streamObj.trigger = pCreate->triggerType; streamObj.trigger = pCreate->triggerType;
streamObj.watermark = pCreate->watermark; streamObj.watermark = pCreate->watermark;
streamObj.triggerParam = pCreate->maxDelay; streamObj.triggerParam = pCreate->maxDelay;
...@@ -607,17 +606,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -607,17 +606,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
} }
#endif #endif
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
mndTransSetDbName(pTrans, createStreamReq.sourceDB, NULL);
// TODO
/*mndTransSetDbName(pTrans, streamObj.targetDb, NULL);*/
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
// build stream obj from request // build stream obj from request
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) { if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
...@@ -626,6 +614,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -626,6 +614,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
// create stb for stream // create stb for stream
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr()); mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create stable st(ts timestamp, a int) tags(t int);
sql create table tu1 using st tags(1);
sql create table tu2 using st tags(2);
sql create stream stream1 trigger window_close into streamt as select _wstartts, sum(a) from st interval(10s);
sql insert into tu1 values(now, 1);
sleep 300
sql select * from streamt;
if $rows != 0 then
print ======$rows
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册