提交 22a5a18f 编写于 作者: H Haojun Liao

fix(stream): 1. set correct timewindow after step2. 2. handle the case when...

fix(stream): 1. set correct timewindow after step2. 2. handle the case when the task is failed to be added into the meta store.
上级 3fdbd538
...@@ -647,9 +647,9 @@ void streamMetaClose(SStreamMeta* streamMeta); ...@@ -647,9 +647,9 @@ void streamMetaClose(SStreamMeta* streamMeta);
// save to b-tree meta store // save to b-tree meta store
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
......
...@@ -160,7 +160,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ...@@ -160,7 +160,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
// 2.save task // 2.save task
taosWLockLatch(&pSnode->pMeta->lock); taosWLockLatch(&pSnode->pMeta->lock);
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask);
bool added = false;
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added);
if (code < 0) { if (code < 0) {
taosWUnLockLatch(&pSnode->pMeta->lock); taosWUnLockLatch(&pSnode->pMeta->lock);
return -1; return -1;
......
...@@ -1039,27 +1039,36 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1039,27 +1039,36 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
SStreamMeta* pStreamMeta = pTq->pStreamMeta; SStreamMeta* pStreamMeta = pTq->pStreamMeta;
// 2.save task, use the newest commit version as the initial start version of stream task. // 2.save task, use the newest commit version as the initial start version of stream task.
int32_t taskId = 0; int32_t taskId = pTask->id.taskId;
taosWLockLatch(&pStreamMeta->lock); bool added = false;
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask);
taskId = pTask->id.taskId; taosWLockLatch(&pStreamMeta->lock);
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
taosWUnLockLatch(&pStreamMeta->lock); taosWUnLockLatch(&pStreamMeta->lock);
return -1; return -1;
} }
// not added into meta store
if (!added) {
tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask);
pTask = NULL;
}
taosWUnLockLatch(&pStreamMeta->lock); taosWUnLockLatch(&pStreamMeta->lock);
tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
// 3. It's an fill history task, do nothing. wait for the main task to start it // 3. It's an fill history task, do nothing. wait for the main task to start it
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
if (p != NULL) { if (p != NULL) { // reset the downstreamReady flag.
streamTaskCheckDownstreamTasks(pTask); p->status.downstreamReady = 0;
streamTaskCheckDownstreamTasks(p);
} }
streamMetaReleaseTask(pStreamMeta, p); streamMetaReleaseTask(pStreamMeta, p);
...@@ -1174,7 +1183,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1174,7 +1183,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (!streamTaskRecoverScanStep1Finished(pTask)) { if (!streamTaskRecoverScanStep1Finished(pTask)) {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
", do secondary scan-history data after halt the related stream task:%s", ", do secondary scan-history from WAL after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
...@@ -1216,12 +1225,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1216,12 +1225,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) { if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug( tqDebug(
"s-task:%s scan history in stream time window completed, no related fill-history task, reset the time " "s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time "
"window:%" PRId64 " - %" PRId64, "window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey); id, pWindow->skey, pWindow->ekey);
qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
} else { } else {
tqDebug( tqDebug(
"s-task:%s scan history in stream time window completed, now start to handle data from WAL, start " "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
} }
......
...@@ -35,7 +35,10 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v ...@@ -35,7 +35,10 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
tqProcessSubmitReqForSubscribe(pTq); tqProcessSubmitReqForSubscribe(pTq);
} }
taosRLockLatch(&pTq->pStreamMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
taosRUnLockLatch(&pTq->pStreamMeta->lock);
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks); tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
// push data for stream processing: // push data for stream processing:
......
...@@ -122,8 +122,9 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) { ...@@ -122,8 +122,9 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
return; return;
} }
qDebug("%s set fill history start key:%" PRId64, GET_TASKID(pTaskInfo), INT64_MIN); qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX);
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX;
} }
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
......
...@@ -237,7 +237,9 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -237,7 +237,9 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
} }
// add to the ready tasks hash map, not the restored tasks hash map // add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
*pAdded = false;
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) { if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
...@@ -261,13 +263,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa ...@@ -261,13 +263,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
*pAdded = true;
return 0; return 0;
} }
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
size_t size = taosHashGetSize(pMeta->pTasks); size_t size = taosHashGetSize(pMeta->pTasks);
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks)); ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));
return (int32_t)size; return (int32_t)size;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册