提交 95d81604 编写于 作者: L liuyao

feat:encode&&decode stream scan op

上级 524bb214
......@@ -80,6 +80,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
// todo refactor
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
......
......@@ -59,6 +59,7 @@ typedef struct {
SStreamFileState* pFileState;
int32_t number;
SSHashObj* parNameMap;
int64_t checkPointId;
} SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
......
......@@ -143,7 +143,8 @@ typedef struct {
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
int64_t dataVersion;
SStreamState* pState;
SStreamState* pState;
int64_t checkPointId;
} SStreamTaskInfo;
typedef struct {
......
......@@ -176,6 +176,12 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
return code;
}
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) {
SExecTaskInfo* pTaskInfo = tinfo;
*dataVer = pTaskInfo->streamInfo.dataVersion;
*ckId = pTaskInfo->streamInfo.checkPointId;
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR;
......
......@@ -1742,6 +1742,26 @@ static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBl
}
}
int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
*pBuff = taosMemoryCalloc(1, len);
updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo);
return len;
}
// other properties are recovered from the execution plan
void streamScanOperatorDeocde(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
if (!pBuff) {
return;
}
SUpdateInfo *pUpInfo = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
int32_t code = updateInfoDeserialize(pBuff, len, pUpInfo);
if (code == TSDB_CODE_SUCCESS) {
pInfo->pUpdateInfo = pUpInfo;
}
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -2000,6 +2020,10 @@ FETCH_NEXT_BLOCK:
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, consume block %d", totBlockNum);
void* buff = NULL;
int32_t len = streamScanOperatorEncode(pInfo, &buff);
//todo(liuyao) save buff
taosMemoryFreeClear(buff);
return NULL;
}
......@@ -2419,6 +2443,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->igExpired = pTableScanNode->igExpired;
pInfo->twAggSup.maxTs = INT64_MIN;
//todo(liuyao) get buff from rocks db;
void* buff = NULL;
int32_t len = 0;
streamScanOperatorDeocde(buff, len, pInfo);
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
......
......@@ -2409,8 +2409,9 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
return startPos;
}
static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version) {
static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) {
pTaskInfo->streamInfo.dataVersion = version;
pTaskInfo->streamInfo.checkPointId = ckId;
}
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
......@@ -2604,7 +2605,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
qDebug("===stream===clear semi operator");
} else {
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
......@@ -4870,7 +4871,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
streamStateCommit(pInfo->pState);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
}
return NULL;
......
......@@ -203,6 +203,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
}
pState->pTdbState->pOwner = pTask;
pState->checkPointId = 0;
return pState;
......@@ -266,6 +267,7 @@ int32_t streamStateCommit(SStreamState* pState) {
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
pState->checkPointId++;
return 0;
#endif
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册