diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4b624e333fb827aef4cdaf6955808765f2896efc..119c1bab2d5e981b65d0989b5a269922a9f98998 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2246,6 +2246,25 @@ typedef struct { int8_t reserved; } SMqVDeleteRsp; +typedef struct { + char name[TSDB_STREAM_FNAME_LEN]; + int64_t streamId; +} SMDropStreamTaskReq; + +typedef struct { + int8_t reserved; +} SMDropStreamTaskRsp; + +typedef struct { + SMsgHead head; + int64_t leftForVer; + int32_t taskId; +} SVDropStreamTaskReq; + +typedef struct { + int8_t reserved; +} SVDropStreamTaskRsp; + typedef struct { int64_t leftForVer; int32_t vgId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index f2ba16ff47f6eb71843582d062f2f772236939d7..f89f548fc5b97bca0fe6cfb0164debda72627c77 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -187,6 +187,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_DROP, "vnode-stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 6fc84e023daa12bf008a79c8825fde2ae6e23498..7214770a632e622cd2c85e58441e65be5e1c9500 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -436,6 +436,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002) #define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003) #define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) +#define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005) // http #define TSDB_CODE_HTTP_SERVER_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x1100) //"http server is not online" diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8cda8fcec3906683a4f98eeb2aebe8ce54823f7c..d895b73fb0425955df6f6b1565d9a6369af6fa40 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -215,6 +215,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 8d3f3e928459126c49e9155e5c7b2fd32e90011c..9946185da6d9a65642dbf725a9456dfb03acbacc 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -350,6 +350,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a2ceb8f90eca468e8b0b8563bdfab869ad8ecc30..ffddab3a91f98d08b058399cf9ef5753d826a49c 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -63,7 +63,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64 .topicQuery = false, .streamQuery = true, .rSmaQuery = true, - .triggerType = STREAM_TRIGGER_WINDOW_CLOSE, + .triggerType = STREAM_TRIGGER_AT_ONCE, .watermark = watermark, /*.filesFactor = filesFactor,*/ }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f447a93392d4e0cbb161a646f7accc1c845d8d7f..2e8974bd9ea65db6aa11ecd39aa2b7a7eac413e8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -35,7 +35,7 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); -/*static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);*/ +static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); /*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); @@ -55,9 +55,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); - /*mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);*/ - /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ - /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ + mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); @@ -196,16 +195,6 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { sdbRelease(pSdb, pStream); } -static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) { - SName name = {0}; - tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - - char db[TSDB_STREAM_FNAME_LEN] = {0}; - tNameGetFullDbName(&name, db); - - return mndAcquireDb(pMnode, db); -} - static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) { terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION; @@ -370,6 +359,47 @@ _OVER: return -1; } +static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { + ASSERT(pTask->nodeId != 0); + + // vnode + if (pTask->nodeId > 0) { + SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pReq->head.vgId = htonl(pTask->nodeId); + pReq->taskId = pTask->taskId; + STransAction action = {0}; + memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + action.pCont = pReq; + action.contLen = sizeof(SVDropStreamTaskReq); + action.msgType = TDMT_VND_STREAM_TASK_DROP; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + } + + return 0; +} + +static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + int32_t lv = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < lv; i++) { + SArray *pTasks = taosArrayGetP(pStream->tasks, i); + int32_t sz = taosArrayGetSize(pTasks); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pTasks, j); + if (mndPersistTaskDropReq(pTrans, pTask) < 0) { + return -1; + } + } + } + return 0; +} + static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { mDebug("stream:%s to create", pCreate->name); SStreamObj streamObj = {0}; @@ -464,6 +494,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto CREATE_STREAM_OVER; } + // TODO check auth pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -495,6 +526,47 @@ CREATE_STREAM_OVER: return code; } +static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SStreamObj *pStream = NULL; + /*SDbObj *pDb = NULL;*/ + /*SUserObj *pUser = NULL;*/ + + SMDropStreamTaskReq dropStreamReq = *(SMDropStreamTaskReq *)pReq->pCont; + + pStream = mndAcquireStream(pMnode, dropStreamReq.name); + + if (pStream == NULL) { + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + return -1; + } + +#if 0 + // todo check auth + pUser = mndAcquireUser(pMnode, pReq->conn.user); + if (pUser == NULL) { + goto DROP_STREAM_OVER; + } +#endif + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq); + if (pTrans == NULL) { + mError("stream:%s, failed to drop since %s", dropStreamReq.name, terrstr()); + return -1; + } + mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropStreamReq.name); + + // drop all tasks + if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + mError("stream:%s, failed to drop task since %s", dropStreamReq.name, terrstr()); + return -1; + } + +DROP_STREAM_OVER: + return 0; +} + static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 52593f7afb870d28a3f9fd3dbc5248baf35edd62..2fd08151812096afab8eafcd98b9245711da1818 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -140,6 +140,7 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7d2dfeeba54a6438d3994cd5a9957a1345f53d20..c9d3267adb8414009bfd427f1d2c335070fe5815 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -510,3 +510,13 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { streamProcessRecoverRsp(pTask, pRsp); return 0; } + +int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { + SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; + int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); + if (code == 0) { + // sendrsp + } + ASSERT(code == 0); + return code; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 34ff9ffa0fd2deeb86a0580af1648bfedc84e125..4a9666292c2245396dadd70a53a443eae5cf8726 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -168,6 +168,11 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp goto _err; } } break; + case TDMT_VND_STREAM_TASK_DROP: { + if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + goto _err; + } + } break; case TDMT_VND_ALTER_CONFIRM: vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp); break; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index f92c65096514e2f9696bc1fd7dddd3b1edc2d0c7..e940191ceafe7688f606df47114920360e16dad0 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -125,7 +125,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { } if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pWal->vers.firstVer, pWal->vers.lastVer); - terrno = TSDB_CODE_WAL_INVALID_VER; + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } if (ver < pWal->vers.snapshotVer) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index d30e0b6844d84a061813811557a800bf8efb440f..9cbc9a3b020580c5c859a3731a0ce5f15fc5d748 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -64,7 +64,10 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { int32_t walCommit(SWal *pWal, int64_t ver) { ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); - if (ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) { + if (ver < pWal->vers.commitVer) { + return 0; + } + if (ver > pWal->vers.lastVer) { terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 621f947b64c92bdb7582d2a1f0916f977792a18e..99a76446cd262d783b2967589919c9e2f9951ec5 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -422,6 +422,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic er TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_LOG_NOT_EXIST, "WAL log not exist") // tfs TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")