未验证 提交 4be94940 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21164 from taosdata/fix/stop_tsdb_read

fix(stream): secure the delete task operation. TD-1198
......@@ -1326,7 +1326,9 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exists", vgId);
......
......@@ -1039,6 +1039,5 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0;
}
......@@ -36,6 +36,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
if (shouldIdle) {
taosWLockLatch(&pMeta->lock);
pMeta->walScanCounter -= 1;
times = pMeta->walScanCounter;
......
......@@ -313,6 +313,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) {
taosWUnLockLatch(&pTask->pMeta->lock);
......
......@@ -216,12 +216,14 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
taosWLockLatch(&pMeta->lock);
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
taosWLockLatch(&pMeta->lock);
streamMetaReleaseTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册