未验证 提交 46ffcc01 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #16918 from taosdata/feature/TD-18820

feat(stream): optimize disc buff
......@@ -619,6 +619,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158)
#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159)
#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
......@@ -385,9 +385,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
......@@ -527,15 +527,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
/*
pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfQnodeFetchThreads = numOfCores / 2;
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
pItem->i32 = tsNumOfQnodeFetchThreads;
pItem->stype = stype;
}
*/
/*
pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfQnodeFetchThreads = numOfCores / 2;
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
pItem->i32 = tsNumOfQnodeFetchThreads;
pItem->stype = stype;
}
*/
pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
......@@ -693,7 +693,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
......@@ -941,10 +941,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
/*
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
*/
/*
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
*/
} else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) {
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
} else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) {
......
......@@ -146,6 +146,7 @@ struct SRSmaInfoItem {
uint16_t nScanned;
int32_t maxDelay; // ms
tmr_h tmrId;
void *pStreamState;
};
struct SRSmaInfo {
......@@ -224,8 +225,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName);
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName);
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
int32_t ref = T_REF_INC(pRSmaInfo);
......
......@@ -92,6 +92,18 @@ void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path,
tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
}
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}
void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);
snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
}
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
}
......@@ -130,6 +142,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
taosTmrStopA(&pItem->tmrId);
}
if (isDeepFree && pItem->pStreamState) {
streamStateClose(pItem->pStreamState);
}
if (isDeepFree && pInfo->taskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
} else {
......@@ -290,12 +306,33 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
SVnode *pVnode = pSma->pVnode;
char taskInfDir[TSDB_FILENAME_LEN] = {0};
void *pStreamState = NULL;
// set the backend of stream state
tdRSmaQTaskInfoGetFullPathEx(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir);
if (!taosCheckExistFile(taskInfDir)) {
char *s = strdup(taskInfDir);
if (taosMulMkDir(taosDirName(s)) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(s);
return TSDB_CODE_FAILED;
}
taosMemoryFree(s);
}
pStreamState = streamStateOpen(taskInfDir, NULL, true);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
return TSDB_CODE_FAILED;
}
SReadHandle handle = {
.meta = pVnode->pMeta,
.vnode = pVnode,
.initTqReader = 1,
.pStateBackend = pStreamState,
};
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
......@@ -303,6 +340,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
}
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot
pItem->pStreamState = pStreamState;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
......@@ -322,7 +360,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem->fetchLevel = pItem->level;
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
", finally maxdelay:%" PRIi32,
......@@ -1226,16 +1263,17 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
goto _err;
}
if (nTables <= 0) {
smaDebug("vgId:%d, no need to restore rsma task %" PRIi8 " since no tables", SMA_VID(pSma), type);
return TSDB_CODE_SUCCESS;
}
#if 0
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
goto _err;
}
#endif
// step 3: reload ts data from checkpoint
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
......@@ -1440,6 +1478,50 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIte
return TSDB_CODE_SUCCESS;
}
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
int32_t vid = SMA_VID(pSma);
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
}
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
pRSmaStat->commitAppliedVer, fsMaxVer);
return TSDB_CODE_SUCCESS;
}
void *infoHash = NULL;
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamState) {
if (streamStateCommit(pItem->pStreamState) < 0) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
goto _err;
}
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", vid, pRSmaInfo->suid,
i + 1);
}
}
}
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
return TSDB_CODE_FAILED;
}
#if 0
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
......@@ -1459,7 +1541,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
pRSmaStat->commitAppliedVer, fsMaxVer);
pRSmaStat->commitAppliedVer, fsMaxVer);
return TSDB_CODE_SUCCESS;
}
......@@ -1579,6 +1661,8 @@ _err:
return TSDB_CODE_FAILED;
}
#endif
/**
* @brief trigger to get rsma result in async mode
*
......@@ -1926,7 +2010,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
atomic_load_64(&pRSmaStat->nBufItems));
atomic_load_64(&pRSmaStat->nBufItems));
break;
}
}
......
......@@ -141,11 +141,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
if (pRsp->withSchema) {
ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
} else {
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
}
ASSERT(!pRsp->withSchema);
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
if (pRsp->blockNum > 0) {
......
......@@ -462,6 +462,7 @@ typedef struct SPartitionDataInfo {
typedef struct STimeWindowAggSupp {
int8_t calTrigger;
int64_t waterMark;
int64_t deleteMark;
TSKEY maxTs;
TSKEY minTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
......@@ -1090,7 +1091,7 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
SExecTaskInfo* pTaskInfo);
int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
#ifdef __cplusplus
}
......
......@@ -4166,9 +4166,8 @@ int32_t setOutputBuf(STimeWindow* win, SResultRow** pResult, int64_t tableGroupI
};
char* value = NULL;
int32_t size = pAggSup->resultRowSize;
/*if (streamStateGet(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {*/
/*value = taosMemoryCalloc(1, size);*/
/*}*/
tSimpleHashPut(pAggSup->pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
if (streamStateAddIfNotExist(pTaskInfo->streamInfo.pState, &key, (void**)&value, &size) < 0) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -4186,7 +4185,7 @@ int32_t releaseOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pR
return TSDB_CODE_SUCCESS;
}
int32_t saveOutput(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
int32_t saveOutputBuf(SExecTaskInfo* pTaskInfo, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
streamStatePut(pTaskInfo->streamInfo.pState, pKey, pResult, resSize);
return TSDB_CODE_SUCCESS;
}
......@@ -4259,8 +4258,9 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock
}
}
}
releaseOutputBuf(pTaskInfo, &key, pRow);
pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pTaskInfo, &key, pRow);
}
blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS;
......
......@@ -867,6 +867,10 @@ static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj*
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
}
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(ts, -1, -1, groupId, pUpdatedMap);
}
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
}
......@@ -918,12 +922,16 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
}
}
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) {
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0);
return pSup->maxTs != INT64_MIN && ts < pSup->maxTs - pSup->waterMark;
bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
ASSERT(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0);
return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
}
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); }
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { return isOverdue(pWin->ekey, pTwSup); }
bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark;
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag) {
......@@ -1374,6 +1382,41 @@ static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData,
return true;
}
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, int32_t numOfOutput) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SWinKey key = {.ts = ts, .groupId = groupId};
tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
streamStateDel(pOperator->pTaskInfo->streamInfo.pState, &key);
return true;
}
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, int32_t numOfOutput, SSDataBlock* pBlock,
SArray* pUpWins, SHashObj* pUpdatedMap) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endTsCols = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
while (win.ekey <= endTsCols[i]) {
uint64_t winGpId = pGpDatas[i];
bool res = doDeleteWindow(pOperator, win.skey, winGpId, numOfOutput);
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
if (pUpWins && res) {
taosArrayPush(pUpWins, &winRes);
}
if (pUpdatedMap) {
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
}
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
}
}
}
bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) {
size_t bytes = sizeof(TSKEY);
SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId);
......@@ -1383,8 +1426,6 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
// window has been closed
return false;
}
// SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId);
// dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage);
tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
return true;
}
......@@ -1512,6 +1553,49 @@ static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup
return TSDB_CODE_SUCCESS;
}
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
SHashObj* pPullDataMap, SHashObj* closeWins, SOperatorInfo* pOperator) {
qDebug("===stream===close interval window");
void* pIte = NULL;
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
SWinKey* pWinKey = (SWinKey*)key;
void* chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
STimeWindow win = {
.skey = pWinKey->ts,
.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1,
};
if (isCloseWindow(&win, pTwSup)) {
if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**)chIds;
int32_t size = taosArrayGetSize(chAy);
qDebug("===stream===window %" PRId64 " wait child size:%d", pWinKey->ts, size);
for (int32_t i = 0; i < size; i++) {
qDebug("===stream===window %" PRId64 " wait child id:%d", pWinKey->ts, *(int32_t*)taosArrayGet(chAy, i));
}
continue;
} else if (pPullDataMap) {
qDebug("===stream===close window %" PRId64, pWinKey->ts);
}
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveWinResultInfo(pWinKey->ts, pWinKey->groupId, closeWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
if (needDeleteWindowBuf(&win, pTwSup)) {
streamStateDel(pOperator->pTaskInfo->streamInfo.pState, pWinKey);
}
}
}
return TSDB_CODE_SUCCESS;
}
static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) {
int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) {
......@@ -4918,8 +5002,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
SExprSupp* pSup = &pOperatorInfo->exprSupp;
SInterval* pInterval = &iaInfo->interval;
int32_t startPos = 0;
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
int32_t startPos = 0;
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
......@@ -4938,7 +5022,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
win.skey = miaInfo->curTs;
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret);
}
......@@ -4963,7 +5047,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
startPos = currPos;
ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
T_LONG_JMP(pTaskInfo->env, ret);
}
......@@ -5032,7 +5116,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
pMiaInfo->prefetchedBlock = pBlock;
cleanupAfterGroupResultGen(pMiaInfo, pRes);
break;
} else {
} else {
// continue
}
}
......@@ -5197,7 +5281,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return TSDB_CODE_SUCCESS;
}
......@@ -5222,7 +5306,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
STimeWindow* prevWin = &prevGrpWin->window;
if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
tdListPopNode(miaInfo->groupIntervals, listNode);
}
}
......@@ -5382,7 +5466,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
if (listNode != NULL) {
SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
pRes->info.groupId = grpWin->groupId;
}
}
......@@ -5591,7 +5675,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock*
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
saveWinResultInfo(pResult->win.skey, tableGroupId, pUpdatedMap);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
......@@ -5600,7 +5684,7 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock*
.ts = nextWin.skey,
.groupId = tableGroupId,
};
saveOutput(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize);
saveOutputBuf(pTaskInfo, &key, pResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pTaskInfo, &key, pResult);
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
......@@ -5645,7 +5729,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
// doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
qDebug("===stream===single interval is done");
......@@ -5671,13 +5756,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock(pBlock, "single interval recv");
if (pBlock->info.type == STREAM_CLEAR) {
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock,
NULL);
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, NULL, NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval,
pUpdatedMap);
// doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval,
// pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pInfo->pDelWins,
pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
......@@ -5704,9 +5790,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
minTs = TMIN(minTs, pBlock->info.window.skey);
doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
// doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
// new disc buf
/*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/
doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
......@@ -5741,8 +5827,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
#endif
pOperator->status = OP_RES_TO_RETURN;
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pOperator);
void* pIte = NULL;
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
......@@ -5751,7 +5837,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
taosArraySort(pUpdated, resultrowComparAsc);
// new disc buf
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
// finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated,
// pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
......@@ -5762,9 +5849,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
// doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
// new disc buf
// doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
doBuildResult(pOperator, pInfo->binfo.pRes, &pInfo->groupResInfo);
printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
......@@ -5809,6 +5896,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = INT64_MAX,
};
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pOperator->pTaskInfo = pTaskInfo;
......
......@@ -140,15 +140,9 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
if (streamStateGet(pState, key, pVal, pVLen) == 0) {
return 0;
}
void* tmp = taosMemoryCalloc(1, size);
if (streamStatePut(pState, key, &tmp, size) == 0) {
taosMemoryFree(tmp);
int32_t code = streamStateGet(pState, key, pVal, pVLen);
ASSERT(code == 0);
return code;
}
taosMemoryFree(tmp);
return -1;
*pVal = tdbRealloc(NULL, size);
memset(*pVal, 0, size);
return 0;
}
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
......@@ -196,9 +190,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
if (pCur == NULL) {
return NULL;
}
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
return NULL;
}
int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur);
return NULL;
}
......@@ -217,9 +216,14 @@ SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key
if (pCur == NULL) {
return NULL;
}
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
return NULL;
}
int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur);
return NULL;
}
......
......@@ -424,6 +424,7 @@ int walLoadMeta(SWal* pWal) {
// find existing meta file
int metaVer = walFindCurMetaVer(pWal);
if (metaVer == -1) {
wDebug("vgId:%d wal find meta ver %d", pWal->cfg.vgId, metaVer);
return -1;
}
char fnameStr[WAL_FILE_LEN];
......
......@@ -621,6 +621,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is m
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_OPEN, "Rsma stream state open")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state commit")
//index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册