未验证 提交 f78208c3 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22280 from taosdata/fix/TD-25496

reload state window state
...@@ -4057,13 +4057,20 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) { ...@@ -4057,13 +4057,20 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
bool compareStateKey(void* data, void* key) { bool compareStateKey(void* data, void* key) {
if (!data || !key) { if (!data || !key) {
return false; return true;
} }
SStateKeys* stateKey = (SStateKeys*)key; SStateKeys* stateKey = (SStateKeys*)key;
stateKey->pData = (char*)key + sizeof(SStateKeys); stateKey->pData = (char*)key + sizeof(SStateKeys);
return compareVal(data, stateKey); return compareVal(data, stateKey);
} }
bool compareWinStateKey(SStateKeys* left, SStateKeys* right) {
if (!left || !right) {
return false;
}
return compareVal(left->pData, right);
}
void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData, void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) { SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) {
int32_t size = pAggSup->resultRowSize; int32_t size = pAggSup->resultRowSize;
...@@ -4086,10 +4093,14 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, ...@@ -4086,10 +4093,14 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size); pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size);
pCurWin->pStateKey = pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pCurWin->pStateKey->type = pAggSup->stateKeyType; pCurWin->pStateKey->type = pAggSup->stateKeyType;
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
pCurWin->pStateKey->isNull = false; pCurWin->pStateKey->isNull = false;
pCurWin->winInfo.sessionWin.groupId = groupId;
pCurWin->winInfo.sessionWin.win.skey = ts;
pCurWin->winInfo.sessionWin.win.ekey = ts;
qDebug("===stream===reset state win key. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, pCurWin->winInfo.sessionWin.win.ekey);
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
...@@ -4241,6 +4252,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4241,6 +4252,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
qDebug("===stream=== stream state agg");
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
...@@ -4340,6 +4352,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4340,6 +4352,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
void streamStateReleaseState(SOperatorInfo* pOperator) { void streamStateReleaseState(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) { if (downstream->fpSet.releaseStreamStateFn) {
...@@ -4365,6 +4378,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur ...@@ -4365,6 +4378,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey)); tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
if (pNextWin->isOutput && pStDeleted) { if (pNextWin->isOutput && pStDeleted) {
qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey, pNextWin->sessionWin.groupId);
saveDeleteRes(pStDeleted, pNextWin->sessionWin); saveDeleteRes(pStDeleted, pNextWin->sessionWin);
} }
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin); removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
...@@ -4383,19 +4397,28 @@ void streamStateReloadState(SOperatorInfo* pOperator) { ...@@ -4383,19 +4397,28 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME, int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size); strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey); int32_t num = size / sizeof(SSessionKey);
qDebug("===stream=== reload state. get result count:%d", num);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey)); ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pSeUpdated && num > 0) { if (!pInfo->pSeUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
} }
if (!pInfo->pSeDeleted && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
}
for (int32_t i = 0; i < num; i++) { for (int32_t i = 0; i < num; i++) {
SStateWindowInfo curInfo = {0}; SStateWindowInfo curInfo = {0};
SStateWindowInfo nextInfo = {0}; SStateWindowInfo nextInfo = {0};
SStateWindowInfo dummy = {0}; SStateWindowInfo dummy = {0};
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i);
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { bool cpRes = compareWinStateKey(curInfo.pStateKey,nextInfo.pStateKey);
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated); qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d", nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes);
if (cpRes) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeDeleted);
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, curInfo.winInfo.sessionWin.groupId);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(curInfo.winInfo, pInfo->pSeUpdated); saveResult(curInfo.winInfo, pInfo->pSeUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
......
...@@ -581,6 +581,7 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -581,6 +581,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
streamSchedExec(pTask);
} else { } else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
......
...@@ -729,6 +729,7 @@ void streamStateFreeVal(void* val) { ...@@ -729,6 +729,7 @@ void streamStateFreeVal(void* val) {
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId);
return streamStateSessionPut_rocksdb(pState, key, value, vLen); return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else #else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册