未验证 提交 5c0747aa 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15877 from taosdata/feature/stream

enh(stream): stream recover
......@@ -513,6 +513,7 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta);
#ifdef __cplusplus
}
......
......@@ -19,23 +19,23 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
void* exec = pTask->exec.executor;
// set input
SStreamQueueItem* pItem = (SStreamQueueItem*)data;
const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
if (pItem->type == STREAM_INPUT__GET_RES) {
SStreamTrigger* pTrigger = (SStreamTrigger*)data;
const SStreamTrigger* pTrigger = (const SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data;
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks;
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
SArray* blocks = pMerged->reqs;
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data;
SArray* blocks = pMerged->reqs;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
} else {
......@@ -51,8 +51,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
}
if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0};
SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
block.info.type = STREAM_PULL_OVER;
......
......@@ -48,8 +48,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
if (streamLoadTasks(pMeta) < 0) {
goto _err;
}
return pMeta;
_err:
if (pMeta->path) taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->db) tdbClose(pMeta->db);
taosMemoryFree(pMeta);
return NULL;
}
......
......@@ -130,14 +130,13 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
return 0;
}
int32_t streamCheckSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
int32_t sz = taosArrayGetSize(pTask->checkpointInfo);
SStreamMultiVgCheckpointInfo checkpoint;
checkpoint.checkpointId = 0;
checkpoint.checkpointId = atomic_fetch_add_32(&pTask->nextCheckId, 1);
checkpoint.checkTs = taosGetTimestampMs();
checkpoint.streamId = pTask->streamId;
checkpoint.taskId = pTask->taskId;
......@@ -169,16 +168,21 @@ int32_t streamCheckSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
goto FAIL;
}
int32_t sz = taosArrayGetSize(pTask->checkpointInfo);
for (int32_t i = 0; i < sz; i++) {
SStreamCheckpointInfo* pCheck = taosArrayGet(pTask->checkpointInfo, i);
pCheck->stateSaveVer = pCheck->stateProcessedVer;
}
taosMemoryFree(buf);
return 0;
FAIL:
if (buf) taosMemoryFree(buf);
return -1;
return 0;
}
int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
// load status
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
void* pVal = NULL;
int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
......@@ -196,29 +200,71 @@ int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
int32_t streamCheckAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t streamSaveSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
return streamSaveStateInfo(pMeta, pTask);
}
int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
return streamLoadStateInfo(pMeta, pTask);
}
int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// save and copy state
// TODO save and copy state
// save state info
if (streamSaveStateInfo(pMeta, pTask) < 0) {
return -1;
}
return 0;
}
int32_t streamFetchSinkStatus(SStreamTask* pTask) {
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
// set self status to recover_phase1
// build fetch status msg
// send fetch msg
return 0;
}
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, void* msg) {
// if failed, set timer and retry
// if successful
// add rsp state to partial recover hash
// if complete, begin actual recover
return 0;
}
int32_t streamRecoverAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// try recover sink level
// after all sink level recovered, choose current state backend to recover
// recover sink level
// after all sink level recovered
// choose suitable state to recover
return 0;
}
int32_t streamCheckSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t streamSaveSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
// try recover agg level
//
// TODO: save and copy state
return 0;
}
int32_t streamRecoverSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
// if totLevel == 3
// fetch agg state
// recover from local state to agg state, not send msg
// recover from agg state to most recent log v1
// enable input queue, set status recover_phase2
// recover from v1 to queue msg v2, set status normal
// if totLevel == 2
// fetch sink state
// recover from local state to sink state v1, send msg
// enable input queue, set status recover_phase2
// recover from v1 to queue msg v2, set status normal
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册