提交 853e6e29 编写于 作者: L Liu Jicong

refactor(mnode): drop stream task

上级 2f8f2e09
......@@ -98,10 +98,9 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes =
taos_query(pConn,
"create stream stream1 trigger max_delay 10s into outstb as select _wstart, sum(k) from st1 partition "
"by tbname session(ts, 10s) ");
pRes = taos_query(pConn,
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
"count(k) from st1 partition by tbname interval(20s) ");
if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -60,6 +60,7 @@ enum {
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DESTROY,
};
typedef enum EStreamType {
......
......@@ -53,6 +53,7 @@ enum {
TASK_SCHED_STATUS__WAITING,
TASK_SCHED_STATUS__ACTIVE,
TASK_SCHED_STATUS__FAILED,
TASK_SCHED_STATUS__DROPPING,
};
enum {
......@@ -127,6 +128,10 @@ typedef struct {
int8_t type;
} SStreamCheckpoint;
typedef struct {
int8_t type;
} SStreamTaskDestroy;
typedef struct {
int8_t type;
SSDataBlock* pBlock;
......
......@@ -291,6 +291,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
......
......@@ -424,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
mndAddTaskToTaskSet(taskSourceLevel, pTask);
pTask->triggerParam = 0;
// source
pTask->taskLevel = TASK_LEVEL__SOURCE;
......
......@@ -2021,8 +2021,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid) {
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
if (pCol->tableId == suid) {
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
return -1;
......@@ -2045,6 +2044,11 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->smaId == 0 && pStream->targetStbUid == suid) {
sdbRelease(pSdb, pStream);
return -1;
}
SNode *pAst = NULL;
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
ASSERT(0);
......@@ -2057,8 +2061,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid) {
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
if (pCol->tableId == suid) {
sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst);
return -1;
......
......@@ -628,8 +628,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
int32_t code = 0;
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
}
......@@ -640,8 +638,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask->outputQueue = streamQueueOpen();
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
code = -1;
goto FAIL;
return -1;
}
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
......@@ -686,14 +683,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
streamSetupTrigger(pTask);
tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
pTask->selfChildId);
FAIL:
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
// TODO free executor
return code;
return 0;
}
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
......
......@@ -32,7 +32,6 @@ typedef struct {
static SStreamGlobalEnv streamEnv;
int32_t streamExec(SStreamTask* pTask);
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
int32_t streamDispatch(SStreamTask* pTask);
......
......@@ -185,7 +185,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
tFreeStreamDispatchReq(pReq);
if (exec) {
streamTryExec(pTask);
if (streamTryExec(pTask) < 0) {
return -1;
}
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
......@@ -221,7 +223,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
}
int32_t streamProcessRunReq(SStreamTask* pTask) {
streamTryExec(pTask);
if (streamTryExec(pTask) < 0) {
return -1;
}
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
......
......@@ -99,16 +99,19 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
goto FAIL;
}
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
goto FAIL;
}
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t));
ASSERT(0);
return -1;
goto FAIL;
}
return 0;
FAIL:
if (pTask) taosMemoryFree(pTask);
if (pTask) tFreeSStreamTask(pTask);
return -1;
}
......@@ -158,11 +161,21 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
}
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
/*return -1;*/
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
/*return -1;*/
}
while (1) {
int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
tFreeSStreamTask(pTask);
break;
}
}
}
return 0;
}
......
......@@ -38,7 +38,9 @@ void streamQueueClose(SStreamQueue* queue) {
if (qItem) {
taosFreeQitem(qItem);
} else {
return;
break;
}
}
taosFreeQall(queue->qall);
taosCloseQueue(queue->queue);
}
......@@ -152,8 +152,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
void tFreeSStreamTask(SStreamTask* pTask) {
streamQueueClose(pTask->inputQueue);
streamQueueClose(pTask->outputQueue);
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
taosMemoryFree(pTask);
......
......@@ -293,6 +293,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册