提交 c58bde97 编写于 作者: L liuyao

stream operator fill history

上级 fb24ed16
......@@ -503,6 +503,8 @@ typedef struct SStreamSessionAggOperatorInfo {
SArray* pUpdated;
SSHashObj* pStUpdated;
int64_t dataVersion;
SArray* historyWins;
bool isHistoryOp;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
......@@ -522,6 +524,8 @@ typedef struct SStreamStateAggOperatorInfo {
SArray* pUpdated;
SSHashObj* pSeUpdated;
int64_t dataVersion;
bool isHistoryOp;
SArray* historyWins;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
......
......@@ -35,6 +35,7 @@ typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_close_fn_t)(void* param);
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_state_fn_t)(struct SOperatorInfo* pOptr);
typedef struct SOperatorFpSet {
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
......@@ -45,6 +46,8 @@ typedef struct SOperatorFpSet {
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn;
__optr_state_fn_t releaseStreamStateFn;
__optr_state_fn_t reloadStreamStateFn;
} SOperatorFpSet;
enum {
......@@ -143,6 +146,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain);
void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn);
int32_t optrDummyOpenFn(SOperatorInfo* pOperator);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
void setOperatorCompleted(SOperatorInfo* pOperator);
......
......@@ -38,11 +38,18 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
.closeFn = closeFn,
.reqBufFn = reqBufFn,
.getExplainFn = explain,
.releaseStreamStateFn = NULL,
.reloadStreamStateFn = NULL,
};
return fpSet;
}
void setOperatorStreamStateFn(SOperatorInfo* pOperator, __optr_state_fn_t relaseFn, __optr_state_fn_t reloadFn) {
pOperator->fpSet.releaseStreamStateFn = relaseFn;
pOperator->fpSet.reloadStreamStateFn = reloadFn;
}
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = 0;
......
......@@ -44,6 +44,7 @@ int32_t scanDebug = 0;
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
#define STREAM_SCAN_OP_NAME "StreamScanOperator"
#define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState"
typedef struct STableMergeScanExecInfo {
SFileBlockLoadRecorder blockRecorder;
......@@ -2317,6 +2318,47 @@ static void destroyStreamScanOperatorInfo(void* param) {
taosMemoryFree(pStreamScan);
}
void streamScanReleaseState(SOperatorInfo* pOperator) {
SStreamScanInfo* pInfo = pOperator->info;
if (!pInfo->pUpdateInfo) {
return;
}
int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
void* pBuff = taosMemoryCalloc(1, len);
pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), pBuff, len);
}
void streamScanReloadState(SOperatorInfo* pOperator) {
SStreamScanInfo* pInfo = pOperator->info;
void* pBuff = NULL;
int32_t len = 0;
pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), &pBuff, &len);
SUpdateInfo* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
int32_t code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo);
if (code == TSDB_CODE_SUCCESS && pInfo->pUpdateInfo) {
if (pInfo->pUpdateInfo->minTS < 0) {
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->pUpdateInfo = pUpInfo;
} else {
pInfo->pUpdateInfo->minTS = TMAX(pInfo->pUpdateInfo->minTS, pUpInfo->minTS);
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion);
SHashObj* curMap = pInfo->pUpdateInfo->pMap;
void *pIte = taosHashIterate(curMap, NULL);
while (pIte != NULL) {
size_t keySize = 0;
int64_t* pUid = taosHashGetKey(pIte, &keySize);
taosHashPut(pUpInfo->pMap, pUid, sizeof(int64_t), pIte, sizeof(TSKEY));
pIte = taosHashIterate(curMap, pIte);
}
taosHashCleanup(curMap);
pInfo->pUpdateInfo->pMap = pUpInfo->pMap;
pUpInfo->pMap = NULL;
pInfo->stateStore.updateInfoDestroy(pUpInfo);
}
}
}
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
SArray* pColIds = NULL;
......@@ -2489,6 +2531,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
__optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
......
......@@ -28,6 +28,8 @@
#define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState"
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
......@@ -2721,6 +2723,26 @@ int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
return size;
}
void streamIntervalReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
return;
}
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
pAPI->stateStore.streamStateCommit(pInfo->pState);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
}
}
void streamIntervalReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......@@ -2830,6 +2852,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
}
......@@ -3463,6 +3486,26 @@ void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo*
pBlock->info.id.groupId = 0;
buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
}
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
int32_t size = taosArrayGetSize(pAllWins);
if (size == 0) {
return;
}
SSessionKey* pSeKey = taosArrayGet(pAllWins, size - 1);
taosArrayPush(pMaxWins, pSeKey);
if (pSeKey->groupId == 0) {
return;
}
uint64_t preGpId = pSeKey->groupId;
for (int32_t i = size - 2; i >= 0; i--) {
pSeKey = taosArrayGet(pAllWins, i);
if (preGpId != pSeKey->groupId) {
taosArrayPush(pMaxWins, pSeKey);
preGpId = pSeKey->groupId;
}
}
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
......@@ -3563,6 +3606,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pStUpdated);
pInfo->pStUpdated = NULL;
if(pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
......@@ -3589,6 +3635,42 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return NULL;
}
void streamSessionReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
}
}
void streamSessionReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SResultWindowInfo winInfo = {0};
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
......@@ -3653,11 +3735,17 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pUpdated = NULL;
pInfo->pStUpdated = NULL;
pInfo->dataVersion = 0;
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
if (!pInfo->historyWins) {
goto _error;
}
pInfo->isHistoryOp = false;
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
......@@ -3765,7 +3853,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pStUpdated);
pInfo->pStUpdated = NULL;
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
......@@ -3815,7 +3902,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
}
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = pPhyNode->type;
......@@ -3879,6 +3966,9 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
}
bool compareStateKey(void* data, void* key) {
if (!data || !key) {
return true;
}
SStateKeys* stateKey = (SStateKeys*)key;
stateKey->pData = (char*)key + sizeof(SStateKeys);
return compareVal(data, stateKey);
......@@ -3902,7 +3992,7 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if (code == TSDB_CODE_SUCCESS) {
pCurWin->winInfo.isOutput = true;
} else {
} else if (pKeyData) {
if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
varDataCopy(pCurWin->pStateKey->pData, pKeyData);
} else {
......@@ -4100,6 +4190,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
tSimpleHashCleanup(pInfo->pSeUpdated);
pInfo->pSeUpdated = NULL;
if(pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
......@@ -4125,6 +4219,68 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return NULL;
}
void streamStateReleaseState(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
}
}
static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
SSHashObj* pStUpdated, SSHashObj* pStDeleted) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SResultRow* pCurResult = NULL;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
SResultRow* pWinResult = NULL;
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true);
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
if (pNextWin->isOutput && pStDeleted) {
saveDeleteRes(pStDeleted, pNextWin->sessionWin);
}
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
taosMemoryFree(pNextWin->pOutputBuf);
saveSessionOutputBuf(pAggSup, pCurWin);
}
void streamStateReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
for (int32_t i = 0; i < num; i++) {
SStateWindowInfo curInfo = {0};
SStateWindowInfo nextInfo = {0};
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
}
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) {
SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
......@@ -4186,11 +4342,17 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pUpdated = NULL;
pInfo->pSeUpdated = NULL;
pInfo->dataVersion = 0;
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
if (!pInfo->historyWins) {
goto _error;
}
pInfo->isHistoryOp = false;
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamSessionReloadState);
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -5017,6 +5179,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册