提交 5572abe1 编写于 作者: L Liu Jicong

feat(stream): drop task

上级 9ae7dc10
......@@ -2246,6 +2246,25 @@ typedef struct {
int8_t reserved;
} SMqVDeleteRsp;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int64_t streamId;
} SMDropStreamTaskReq;
typedef struct {
int8_t reserved;
} SMDropStreamTaskRsp;
typedef struct {
SMsgHead head;
int64_t leftForVer;
int32_t taskId;
} SVDropStreamTaskReq;
typedef struct {
int8_t reserved;
} SVDropStreamTaskRsp;
typedef struct {
int64_t leftForVer;
int32_t vgId;
......
......@@ -187,6 +187,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_DROP, "vnode-stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
......
......@@ -436,6 +436,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002)
#define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003)
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004)
#define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005)
// http
#define TSDB_CODE_HTTP_SERVER_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x1100) //"http server is not online"
......
......@@ -215,6 +215,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -350,6 +350,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
......
......@@ -63,7 +63,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
.topicQuery = false,
.streamQuery = true,
.rSmaQuery = true,
.triggerType = STREAM_TRIGGER_WINDOW_CLOSE,
.triggerType = STREAM_TRIGGER_AT_ONCE,
.watermark = watermark,
/*.filesFactor = filesFactor,*/
};
......
......@@ -35,7 +35,7 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
/*static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);*/
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
/*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
......@@ -55,9 +55,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
/*mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
......@@ -196,16 +195,6 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
sdbRelease(pSdb, pStream);
}
static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) {
SName name = {0};
tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char db[TSDB_STREAM_FNAME_LEN] = {0};
tNameGetFullDbName(&name, db);
return mndAcquireDb(pMnode, db);
}
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
......@@ -370,6 +359,47 @@ _OVER:
return -1;
}
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
ASSERT(pTask->nodeId != 0);
// vnode
if (pTask->nodeId > 0) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_VND_STREAM_TASK_DROP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
}
return 0;
}
static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t lv = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < lv; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndPersistTaskDropReq(pTrans, pTask) < 0) {
return -1;
}
}
}
return 0;
}
static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
mDebug("stream:%s to create", pCreate->name);
SStreamObj streamObj = {0};
......@@ -464,6 +494,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto CREATE_STREAM_OVER;
}
// TODO check auth
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
......@@ -495,6 +526,47 @@ CREATE_STREAM_OVER:
return code;
}
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SStreamObj *pStream = NULL;
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMDropStreamTaskReq dropStreamReq = *(SMDropStreamTaskReq *)pReq->pCont;
pStream = mndAcquireStream(pMnode, dropStreamReq.name);
if (pStream == NULL) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
#if 0
// todo check auth
pUser = mndAcquireUser(pMnode, pReq->conn.user);
if (pUser == NULL) {
goto DROP_STREAM_OVER;
}
#endif
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropStreamReq.name, terrstr());
return -1;
}
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropStreamReq.name);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropStreamReq.name, terrstr());
return -1;
}
DROP_STREAM_OVER:
return 0;
}
static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
......
......@@ -140,6 +140,7 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -510,3 +510,13 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
streamProcessRecoverRsp(pTask, pRsp);
return 0;
}
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
if (code == 0) {
// sendrsp
}
ASSERT(code == 0);
return code;
}
......@@ -168,6 +168,11 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto _err;
}
} break;
case TDMT_VND_STREAM_TASK_DROP: {
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_VND_ALTER_CONFIRM:
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
break;
......
......@@ -125,7 +125,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
}
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_INVALID_VER;
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (ver < pWal->vers.snapshotVer) {
......
......@@ -64,7 +64,10 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
if (ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) {
if (ver < pWal->vers.commitVer) {
return 0;
}
if (ver > pWal->vers.lastVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1;
}
......
......@@ -422,6 +422,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic er
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_LOG_NOT_EXIST, "WAL log not exist")
// tfs
TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册