提交 8b12d4d3 编写于 作者: H Haojun Liao

fix(stream): secure the delete task operation. TD-1198

上级 38cbe0b7
...@@ -1326,7 +1326,9 @@ int32_t tqStartStreamTasks(STQ* pTq) { ...@@ -1326,7 +1326,9 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
if (numOfTasks == 0) { if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exists", vgId); tqInfo("vgId:%d no stream tasks exists", vgId);
......
...@@ -1039,6 +1039,5 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -1039,6 +1039,5 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0; return 0;
} }
...@@ -36,6 +36,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { ...@@ -36,6 +36,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
if (shouldIdle) { if (shouldIdle) {
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
pMeta->walScanCounter -= 1; pMeta->walScanCounter -= 1;
times = pMeta->walScanCounter; times = pMeta->walScanCounter;
......
...@@ -313,6 +313,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -313,6 +313,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
taosWLockLatch(&pTask->pMeta->lock); taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask); streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) { if (streamMetaCommit(pTask->pMeta) < 0) {
taosWUnLockLatch(&pTask->pMeta->lock); taosWUnLockLatch(&pTask->pMeta->lock);
......
...@@ -216,12 +216,14 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -216,12 +216,14 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
taosWLockLatch(&pMeta->lock);
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
taosWLockLatch(&pMeta->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册