提交 693942b8 编写于 作者: L liuyao

trans state

上级 3b2f2f0a
......@@ -229,6 +229,9 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
void qStreamCloseTsdbReader(void* task);
void resetTaskInfo(qTaskInfo_t tinfo);
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);
#ifdef __cplusplus
}
#endif
......
......@@ -392,6 +392,7 @@ typedef struct SStateStore {
int32_t (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
} SStateStore;
typedef struct SStorageAPI {
......
......@@ -138,6 +138,8 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateReloadInfo(SStreamState* pState, TSKEY ts);
/***compare func **/
typedef struct SStateChekpoint {
......
......@@ -620,6 +620,9 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
#ifdef __cplusplus
}
#endif
......
......@@ -49,6 +49,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
#ifdef __cplusplus
}
......
......@@ -101,6 +101,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy= streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
}
void initFunctionStateStore(SFunctionStateStore* pStore) {
......
......@@ -1111,6 +1111,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pStreamTask == NULL) {
// todo handle error
}
// streamTaskReleaseState(pTask);
// streamTaskReloadState(pStreamTask);
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
......
......@@ -203,6 +203,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
}
void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {
......
......@@ -1326,4 +1326,16 @@ SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
SOperatorInfo* pOperator = pTaskInfo->pRoot;
extractTableList(pArray, pOperator);
return pArray;
}
\ No newline at end of file
}
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
return 0;
}
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
return 0;
}
......@@ -73,6 +73,20 @@ static void destroyIndefinitOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
void streamOperatorReleaseState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
}
}
void streamOperatorReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -134,6 +148,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamOperatorReleaseState, streamOperatorReloadState);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -28,6 +28,7 @@
#define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState"
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState"
......@@ -2724,8 +2725,10 @@ int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
}
void streamIntervalReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
return;
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t resSize = sizeof(TSKEY);
pInfo->statestore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize);
}
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
......@@ -2737,6 +2740,15 @@ void streamIntervalReleaseState(SOperatorInfo* pOperator) {
}
void streamIntervalReloadState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pInfo->statestore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
TSKEY ts = *(TSKEY*)pBuf;
pInfo->statestore.streamStateReloadInfo(pInfo->pState, ts);
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
......@@ -3651,7 +3663,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SResultWindowInfo winInfo = {0};
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
......@@ -4352,7 +4363,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamSessionReloadState);
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -562,3 +562,15 @@ int32_t streamTryExec(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskReleaseState(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
int32_t code = qStreamOperatorReleaseState(exec);
return code;
}
int32_t streamTaskReloadState(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
int32_t code = qStreamOperatorReloadState(exec);
return code;
}
......@@ -1113,6 +1113,10 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#endif
}
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) {
streamFileStateReloadInfo(pState->pFileState, ts);
}
#if 0
char* streamStateSessionDump(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
......
......@@ -524,3 +524,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
}
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
pFileState->flushMark = TMAX(pFileState->flushMark, ts);
pFileState->maxTs = TMAX(pFileState->maxTs, ts);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册