提交 402c091d 编写于 作者: H Haojun Liao

fix(stream): fix sma error.

上级 9d8f6f37
......@@ -267,11 +267,11 @@ typedef struct SCheckpointInfo {
typedef struct SStreamStatus {
int8_t taskStatus;
int8_t checkDownstream;
int8_t checkDownstream; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
TdThreadMutex lock;
int8_t timerActive; // timer is active
} SStreamStatus;
typedef struct SHistDataRange {
......
......@@ -168,16 +168,53 @@ void tqNotifyClose(STQ* pTq) {
}
SStreamTask* pTask = *(SStreamTask**)pIter;
tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
tqDebug("vgId:%d s-task:%s set closing flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__STOP;
int64_t st = taosGetTimestampMs();
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId);
int64_t st = taosGetTimestampMs();
while(1) {
taosMsleep(1000);
bool inTimer = false;
taosWLockLatch(&pTq->pStreamMeta->lock);
pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive == 1) {
inTimer = true;
}
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
if (inTimer) {
tqDebug("vgId:%d some tasks in timer, wait for 1sec and recheck", pTq->pStreamMeta->vgId);
} else {
break;
}
}
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%"PRId64" ms", pTq->pStreamMeta->vgId, el);
}
}
......
......@@ -280,18 +280,53 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
}
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
taosWLockLatch(&pMeta->lock);
SStreamTask* pTask = NULL;
// pre-delete operation
taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
pTask = *ppTask;
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
taosWUnLockLatch(&pMeta->lock);
return;
}
taosWUnLockLatch(&pMeta->lock);
qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING));
while(1) {
taosRLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
if ((*ppTask)->status.timerActive == 0) {
taosRUnLockLatch(&pMeta->lock);
break;
}
taosMsleep(10);
qDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
taosRUnLockLatch(&pMeta->lock);
} else {
taosRUnLockLatch(&pMeta->lock);
break;
}
}
// let's do delete of stream task
taosWLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
ASSERT(pTask->status.timerActive == 0);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1);
for (int32_t i = 0; i < num; ++i) {
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
......
......@@ -446,20 +446,54 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
streamTaskCheckDownstreamTasks(pHTask);
}
typedef struct SStreamTaskRetryInfo {
SStreamMeta* pMeta;
int32_t taskId;
} SStreamTaskRetryInfo;
static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTask* pTask = param;
SStreamTaskRetryInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
qDebug("s-task:0x%x in timer to launch history task", pInfo->taskId);
taosWLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
if (ppTask) {
ASSERT((*ppTask)->status.timerActive == 1);
if (streamTaskShouldStop(&(*ppTask)->status)) {
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr,
streamGetTaskStatusStr((*ppTask)->status.taskStatus));
(*ppTask)->status.timerActive = 0;
taosWUnLockLatch(&pMeta->lock);
return;
}
}
taosWUnLockLatch(&pMeta->lock);
SStreamMeta* pMeta = pTask->pMeta;
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId));
if (pHTask == NULL) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
pMeta->vgId, pTask->historyTaskId.taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId);
if (pTask != NULL) {
ASSERT(pTask->status.timerActive == 1);
taosTmrReset(tryLaunchHistoryTask, 100, pTask, streamEnv.timer, &pTask->timer);
return;
}
// abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
if (pHTask == NULL && pTask->status.taskStatus == TASK_STATUS__NORMAL) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it may not be built or have been destroyed",
pTask->id.idStr, pMeta->vgId, pTask->historyTaskId.taskId);
doCheckDownstreamStatus(pTask, *pHTask);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer);
return;
}
doCheckDownstreamStatus(pTask, pHTask);
// not in timer anymore
pTask->status.timerActive = 0;
streamMetaReleaseTask(pMeta, pHTask);
streamMetaReleaseTask(pMeta, pTask);
} else {
qError("s-task:0x%x failed to load task", pInfo->taskId);
}
}
// todo fix the bug: 2. race condition
......@@ -474,9 +508,16 @@ int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) {
pMeta->vgId, pTask->historyTaskId.taskId);
if (pTask->timer == NULL) {
pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pTask, streamEnv.timer);
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
pInfo->taskId = pTask->id.taskId;
pInfo->pMeta = pTask->pMeta;
pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer);
if (pTask->timer == NULL) {
// todo failed to create timer
} else {
pTask->status.timerActive = 1; // timer is active
qDebug("s-task:%s set time active flag", pTask->id.idStr);
}
}
......@@ -602,6 +643,7 @@ int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishRe
return 0;
}
// todo handle race condition, this task may be destroyed
void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
if (pTask->info.fillHistory) {
qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
......
......@@ -22,7 +22,7 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), True)
self.dbname = 'db'
self.setsql = TDSetSql()
self.stbname = 'stb'
......@@ -76,8 +76,8 @@ class TDTestCase:
tdSql.execute(f'drop database {self.dbname}')
def run(self):
self.topic_name_check()
self.db_name_check()
# self.topic_name_check()
# self.db_name_check()
self.stream_name_check()
def stop(self):
tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册