提交 015c00aa 编写于 作者: L Liu Jicong

fix(stream): timer refer stream task

上级 6478fcb2
...@@ -51,6 +51,7 @@ void streamSchedByTimer(void* param, void* tmrId) { ...@@ -51,6 +51,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param; SStreamTask* pTask = (void*)param;
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(NULL, pTask);
return; return;
} }
...@@ -80,6 +81,8 @@ void streamSchedByTimer(void* param, void* tmrId) { ...@@ -80,6 +81,8 @@ void streamSchedByTimer(void* param, void* tmrId) {
int32_t streamSetupTrigger(SStreamTask* pTask) { int32_t streamSetupTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) { if (pTask->triggerParam != 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2);
pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE; pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
} }
......
...@@ -80,7 +80,12 @@ void streamMetaClose(SStreamMeta* pMeta) { ...@@ -80,7 +80,12 @@ void streamMetaClose(SStreamMeta* pMeta) {
pIter = taosHashIterate(pMeta->pTasks, pIter); pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->timer) {
taosTmrStop(pTask->timer);
pTask->timer = NULL;
}
tFreeSStreamTask(pTask); tFreeSStreamTask(pTask);
/*streamMetaReleaseTask(pMeta, pTask);*/
} }
taosHashCleanup(pMeta->pTasks); taosHashCleanup(pMeta->pTasks);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
...@@ -202,6 +207,10 @@ void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId) { ...@@ -202,6 +207,10 @@ void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId) {
if (ppTask) { if (ppTask) {
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
/*if (pTask->timer) {
* taosTmrStop(pTask->timer);*/
/*pTask->timer = NULL;*/
/*}*/
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
......
...@@ -102,6 +102,7 @@ int32_t walCommit(SWal *pWal, int64_t ver) { ...@@ -102,6 +102,7 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver);
int64_t code; int64_t code;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) {
...@@ -123,8 +124,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -123,8 +124,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet); int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
for (int i = pWal->writeCur + 1; i < fileSetSize; i++) { for (int i = pWal->writeCur + 1; i < fileSetSize; i++) {
walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
taosRemoveFile(fnameStr); taosRemoveFile(fnameStr);
walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
taosRemoveFile(fnameStr); taosRemoveFile(fnameStr);
} }
// pop from fileInfoSet // pop from fileInfoSet
...@@ -157,6 +160,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -157,6 +160,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr);
if (pLogFile == NULL) { if (pLogFile == NULL) {
// TODO // TODO
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -324,9 +328,9 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -324,9 +328,9 @@ int32_t walEndSnapshot(SWal *pWal) {
pInfo++; pInfo++;
} }
if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) { if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) {
wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer); wDebug("vgId:%d, wal end remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
} else { } else {
wDebug("vgId:%d, no remove", pWal->cfg.vgId); wDebug("vgId:%d, wal no remove", pWal->cfg.vgId);
} }
// iterate files, until the searched result // iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
...@@ -343,12 +347,12 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -343,12 +347,12 @@ int32_t walEndSnapshot(SWal *pWal) {
for (int i = 0; i < deleteCnt; i++) { for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i); pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr); walBuildLogName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr); wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) { if (taosRemoveFile(fnameStr) < 0) {
goto UPDATE_META; goto UPDATE_META;
} }
walBuildIdxName(pWal, pInfo->firstVer, fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr); wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) { if (taosRemoveFile(fnameStr) < 0) {
ASSERT(0); ASSERT(0);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册