diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index dd4fbc8d2d400bc9fc257202bda1979a62041895..2fcf4dd62c1e0a2f5aabda4ce5eb9fae6aa72be8 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -98,10 +98,9 @@ int32_t create_stream() { /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ - pRes = - taos_query(pConn, - "create stream stream1 trigger max_delay 10s into outstb as select _wstart, sum(k) from st1 partition " - "by tbname session(ts, 10s) "); + pRes = taos_query(pConn, + "create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, " + "count(k) from st1 partition by tbname interval(20s) "); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tcommon.h b/include/common/tcommon.h index e04d9d5e86738d19155402e25c05fbeee6be85bd..dbe020f7ecaf869f2e3fdb99fb86e33c5f873ecb 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -60,6 +60,7 @@ enum { STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, + STREAM_INPUT__DESTROY, }; typedef enum EStreamType { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e6fcb021d55639c80158f12c191d772336057e1b..384c6a289f304e4c59a097663bb4224e979bd226 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -53,6 +53,7 @@ enum { TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE, TASK_SCHED_STATUS__FAILED, + TASK_SCHED_STATUS__DROPPING, }; enum { @@ -127,6 +128,10 @@ typedef struct { int8_t type; } SStreamCheckpoint; +typedef struct { + int8_t type; +} SStreamTaskDestroy; + typedef struct { int8_t type; SSDataBlock* pBlock; @@ -211,7 +216,6 @@ typedef struct { void* vnode; FTbSink* tbSinkFunc; STSchema* pTSchema; - SHashObj* pHash; // groupId to tbuid } STaskSinkTb; typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index d7ec3697afe6500b38eb102339d4f3a7ea77d550..c3796fbadd123f296cef0c8e6f59a5d60f38070d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -291,6 +291,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1) #define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2) #define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3) +#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a24b7ef4597ceb1d5aba35efe907e9a7e12cb0a8..3bfd7eb5964a446698556551bbe572f2dc568110 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -424,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } mndAddTaskToTaskSet(taskSourceLevel, pTask); + pTask->triggerParam = 0; + // source pTask->taskLevel = TASK_LEVEL__SOURCE; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index dd2b595c2953232f4ccbf3b5f399ecd477823917..6083a76981dc52c1046ea9ba756b133e20f22407 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2021,8 +2021,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, FOREACH(pNode, pNodeList) { SColumnNode *pCol = (SColumnNode *)pNode; - if (pCol->tableId != suid) { - mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); + if (pCol->tableId == suid) { sdbRelease(pSdb, pTopic); nodesDestroyNode(pAst); return -1; @@ -2045,6 +2044,16 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; + if (pStream->smaId != 0) { + sdbRelease(pSdb, pStream); + continue; + } + + if (pStream->targetStbUid == suid) { + sdbRelease(pSdb, pStream); + return -1; + } + SNode *pAst = NULL; if (nodesStringToNode(pStream->ast, &pAst) != 0) { ASSERT(0); @@ -2057,8 +2066,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, FOREACH(pNode, pNodeList) { SColumnNode *pCol = (SColumnNode *)pNode; - if (pCol->tableId != suid) { - mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId); + if (pCol->tableId == suid) { sdbRelease(pSdb, pStream); nodesDestroyNode(pAst); return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a98fea19887b66926a7469b12c6d94f97853f051..c6bc8e6e598507ea820bb109e84fbb41a0ed099b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -628,8 +628,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { - int32_t code = 0; - if (pTask->taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); } @@ -640,8 +638,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { pTask->outputQueue = streamQueueOpen(); if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { - code = -1; - goto FAIL; + return -1; } pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; @@ -686,14 +683,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { streamSetupTrigger(pTask); - tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, + tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, pTask->selfChildId); - -FAIL: - if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); - if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); - // TODO free executor - return code; + return 0; } int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 55630511bf22a3fc778b733a85dabfe5442f25d6..522bf46aa1fb9d28225f3118b9b6e1bed7a6cd75 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -231,34 +231,35 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(pTask->tbSink.pTSchema); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, - pTask->tbSink.stbFullName, &deleteReq); + SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, + pTask->tbSink.stbFullName, &deleteReq); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); - int32_t code; - int32_t len; - tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - // - ASSERT(0); - } - SEncoder encoder; - void* buf = rpcMallocCont(len + sizeof(SMsgHead)); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, abuf, len); - tEncodeSBatchDeleteReq(&encoder, &deleteReq); - tEncoderClear(&encoder); + if (taosArrayGetSize(deleteReq.deleteReqs) != 0) { + int32_t code; + int32_t len; + tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); + if (code < 0) { + // + ASSERT(0); + } + SEncoder encoder; + void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); + void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, len); + tEncodeSBatchDeleteReq(&encoder, &deleteReq); + tEncoderClear(&encoder); - ((SMsgHead*)buf)->vgId = pVnode->config.vgId; + ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; - if (taosArrayGetSize(deleteReq.deleteReqs) != 0) { SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, - .pCont = buf, + .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(serializedDeleteReq); tqDebug("failed to put into write-queue since %s", terrstr()); } } @@ -268,11 +269,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { // build write msg SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, - .pCont = pReq, - .contLen = ntohl(pReq->length), + .pCont = submitReq, + .contLen = ntohl(submitReq->length), }; if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(submitReq); tqDebug("failed to put into write-queue since %s", terrstr()); } } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 3776cb261f5f19cf291f66c28efc57a54489a592..6e30eeaa8643273da0669159571caad793e8b034 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -32,7 +32,6 @@ typedef struct { static SStreamGlobalEnv streamEnv; -int32_t streamExec(SStreamTask* pTask); int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch); int32_t streamDispatch(SStreamTask* pTask); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6da7d4fd59028ac09a4493dfcd311769c619d52d..d6e87c27366da27dda96a41c7a9d2fda92c652a9 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -185,7 +185,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S tFreeStreamDispatchReq(pReq); if (exec) { - streamTryExec(pTask); + if (streamTryExec(pTask) < 0) { + return -1; + } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); @@ -221,7 +223,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { } int32_t streamProcessRunReq(SStreamTask* pTask) { - streamTryExec(pTask); + if (streamTryExec(pTask) < 0) { + return -1; + } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 64a9537e6c43df3d5748e05908bb725c759e0631..5ff700546cf63acd3e0c4d0798383d6724947de1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -15,6 +15,7 @@ #include "executor.h" #include "tstream.h" +#include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -99,16 +100,19 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* goto FAIL; } - taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + goto FAIL; + } if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) { + taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t)); ASSERT(0); - return -1; + goto FAIL; } return 0; FAIL: - if (pTask) taosMemoryFree(pTask); + if (pTask) tFreeSStreamTask(pTask); return -1; } @@ -158,11 +162,28 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* pTask = *ppTask; taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); - } - if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) { - /*return -1;*/ + if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) { + /*return -1;*/ + } + + if (pTask->triggerParam != 0) { + taosTmrStop(pTask->timer); + } + + while (1) { + int8_t schedStatus = + atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING); + if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { + tFreeSStreamTask(pTask); + break; + } else if (schedStatus == TASK_SCHED_STATUS__DROPPING) { + break; + } + taosMsleep(10); + } } + return 0; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 6819e5329fdcbac011ae258e89e2f665f85ebbca..ac10c8258744f178bd2010543176f545b40c88b6 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tstream.h" +#include "streamInc.h" SStreamQueue* streamQueueOpen() { SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); @@ -36,9 +36,12 @@ void streamQueueClose(SStreamQueue* queue) { while (1) { void* qItem = streamQueueNextItem(queue); if (qItem) { - taosFreeQitem(qItem); + streamFreeQitem(qItem); } else { - return; + break; } } + taosFreeQall(queue->qall); + taosCloseQueue(queue->queue); + taosMemoryFree(queue); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 638d39e5cc386ccd5922936dc9606f9b3aa2d8cd..d588d90543d90e19699c5c901672b6ab4fdd2c88 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -152,9 +152,17 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } void tFreeSStreamTask(SStreamTask* pTask) { - streamQueueClose(pTask->inputQueue); - streamQueueClose(pTask->outputQueue); + if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); + if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); + taosArrayDestroy(pTask->childEpInfo); + if (pTask->outputType == TASK_OUTPUT__TABLE) { + tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); + taosMemoryFree(pTask->tbSink.pTSchema); + } + if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + } taosMemoryFree(pTask); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 7b06967940e42a8b0cef13a775fa64bfe512719c..f6b62b5ea8a630d79c24d42cd7b9077c884bb0c6 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -293,6 +293,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")