未验证 提交 23e285ac 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20995 from taosdata/fix/liaohj_main

fix(stream): add lock during check wal to create new stream task.
......@@ -1309,7 +1309,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
return -1;
}
tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
initOffsetForAllRestoreTasks(pTq);
pRunReq->head.vgId = vgId;
......
......@@ -15,8 +15,7 @@
#include "tq.h"
static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
// this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
......@@ -32,7 +31,7 @@ int tqStreamTasksScanWal(STQ* pTq) {
// check all restore tasks
bool shouldIdle = true;
streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle);
doCreateReqsByScanWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle);
int32_t times = 0;
......@@ -55,50 +54,50 @@ int tqStreamTasksScanWal(STQ* pTq) {
int64_t el = (taosGetTimestampMs() - st);
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el);
// restore wal scan flag
// atomic_store_8(&pTq->pStreamMeta->walScan, 0);
return 0;
}
//int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
// int32_t numOfTask = taosArrayGetSize(pTaskList);
// if (numOfTask <= 0) {
// return TSDB_CODE_SUCCESS;
// }
//
// // todo: add lock
// for (int32_t i = 0; i < numOfTask; ++i) {
// SStreamTask* pTask = taosArrayGetP(pTaskList, i);
// tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64,
// pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id);
// taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
//
// // NOTE: do not change the following order
// atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
// taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
// }
//
// return TSDB_CODE_SUCCESS;
//}
int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t));
void* pIter = NULL;
int32_t vgId = pStreamMeta->vgId;
*pScanIdle = true;
bool allWalChecked = true;
tqDebug("vgId:%d start to check wal to extract new submit block", vgId);
while (1) {
taosWLockLatch(&pStreamMeta->lock);
while(1) {
pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
taosArrayPush(pTaskIdList, &pTask->id.taskId);
}
taosWUnLockLatch(&pStreamMeta->lock);
return pTaskIdList;
}
int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
*pScanIdle = true;
bool noNewDataInWal = true;
int32_t vgId = pStreamMeta->vgId;
int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks);
for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskIdList, i);
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
if (pTask == NULL) {
continue;
}
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......@@ -106,6 +105,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
pTask->status.taskStatus);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......@@ -115,6 +115,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
if (tInputQueueIsFull(pTask)) {
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......@@ -127,6 +128,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
// seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......@@ -136,6 +138,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
SPackedData packData = {0};
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
if (code != TSDB_CODE_SUCCESS) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......@@ -143,10 +146,11 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
allWalChecked = false;
noNewDataInWal = false;
tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
......@@ -160,11 +164,15 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
streamDataSubmitDestroy(p);
taosFreeQitem(p);
streamMetaReleaseTask(pStreamMeta, pTask);
}
if (allWalChecked) {
// all wal are checked, and no new data available in wal.
if (noNewDataInWal) {
*pScanIdle = true;
}
taosArrayDestroy(pTaskIdList);
return 0;
}
......@@ -84,11 +84,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
tdbClose(pMeta->db);
void* pIter = NULL;
// while(pMeta->walScan) {
// qDebug("wait stream daemon quit");
// taosMsleep(100);
// }
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
......@@ -102,7 +97,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
tFreeStreamTask(pTask);
/*streamMetaReleaseTask(pMeta, pTask);*/
}
taosHashCleanup(pMeta->pTasks);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册