提交 15cceb5a 编写于 作者: H Haojun Liao

refactor: do some internal refactors.

上级 74da3c05
......@@ -287,6 +287,11 @@ typedef struct SStreamId {
const char* idStr;
} SStreamId;
typedef struct SCheckpointInfo {
int64_t id;
int64_t version; // offset in WAL
} SCheckpointInfo;
struct SStreamTask {
SStreamId id;
int32_t totalLevel;
......@@ -298,8 +303,8 @@ struct SStreamTask {
int32_t selfChildId;
int32_t nodeId;
SEpSet epSet;
int64_t recoverSnapVer;
int64_t startVer;
SCheckpointInfo chkInfo;
STaskExec exec;
// fill history
int8_t fillHistory;
......@@ -309,9 +314,6 @@ struct SStreamTask {
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
// exec
STaskExec exec;
// output
union {
STaskDispatcherFixedEp fixedEpDispatcher;
......@@ -587,7 +589,7 @@ void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
// SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
......
......@@ -76,7 +76,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb;
pTask->startVer = ver;
pTask->chkInfo.version = ver;
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
......
......@@ -906,7 +906,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->startVer = ver;
pTask->chkInfo.version = ver;
pTask->pMeta = pTq->pStreamMeta;
// expand executor
......@@ -981,7 +981,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
streamSetupTrigger(pTask);
tqInfo("vgId:%d expand stream task, s-task:%s, ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
pTask->startVer, pTask->selfChildId, pTask->taskLevel);
pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
return 0;
}
......@@ -1124,7 +1124,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
}
// check param
int64_t fillVer1 = pTask->startVer;
int64_t fillVer1 = pTask->chkInfo.version;
if (fillVer1 <= 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
......
......@@ -96,7 +96,7 @@ void initOffsetForAllRestoreTasks(STQ* pTq) {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, pTask->startVer);
doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version);
}
}
......
......@@ -2088,7 +2088,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2112,7 +2112,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
}
}
} else { // not merge block data
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2352,7 +2352,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, piRow, piSchema);
} else {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -2575,7 +2575,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
SRow* pTSRow = NULL;
SRowMerger merge = {0};
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -3243,7 +3243,7 @@ static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
while (1) {
bool hasNext = false;
int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
if (code) {
return code;
}
......@@ -3515,8 +3515,8 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
int64_t endVer = 0;
if (pCond->endVersion ==
-1) { // user not specified end version, set current maximum version of vnode as the endVersion
if (pCond->endVersion == -1) {
// user not specified end version, set current maximum version of vnode as the endVersion
endVer = pVnode->state.applied;
} else {
endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
......
......@@ -288,9 +288,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t ckId = 0;
int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
if (dataVer > pTask->startVer) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->startVer, dataVer);
pTask->startVer = dataVer;
if (dataVer > pTask->chkInfo.version) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer);
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId};
streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) {
......
......@@ -70,8 +70,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->recoverSnapVer) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->startVer) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.id) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1;
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
......@@ -123,8 +123,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->recoverSnapVer) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->startVer) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.id) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1;
int32_t epSz;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册