未验证 提交 1e2fdd57 编写于 作者: L liuyao 提交者: GitHub

Merge pull request #13492 from taosdata/feature/TD-16285

feat(stream): distributed stream interval
......@@ -188,6 +188,13 @@ SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
*/
void dBufPrintStatis(const SDiskbasedBuf* pBuf);
/**
* Set all of page buffer are not need
* @param pBuf
* @return
*/
void clearDiskbasedBuf(SDiskbasedBuf* pBuf);
#ifdef __cplusplus
}
#endif
......
......@@ -1500,7 +1500,7 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (pColInfoData->hasNull) {
if (colDataIsNull(pColInfoData, rows, j, NULL)) {
printf(" %15s |", "NULL");
continue;
}
......
......@@ -488,6 +488,8 @@ typedef struct SStreamFinalIntervalOperatorInfo {
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SArray* pChildren;
SSDataBlock* pUpdateRes;
SPhysiNode* pPhyNode; // create new child
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
......@@ -793,9 +795,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
......
......@@ -4634,6 +4634,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 8;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
......
......@@ -956,16 +956,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if (rows == 0) {
pOperator->status = OP_EXEC_DONE;
} else if (pInfo->pUpdateInfo) {
SSDataBlock* upRes = createOneDataBlock(pInfo->pRes, false);
getUpdateDataBlock(pInfo, true, pInfo->pRes, upRes);
if (upRes) {
pInfo->pUpdateRes = upRes;
if (upRes->info.type == STREAM_REPROCESS) {
blockDataCleanup(pInfo->pUpdateRes);
getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) {
if (pInfo->pUpdateRes->info.type == STREAM_REPROCESS) {
pInfo->updateResIndex = 0;
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (upRes->info.type == STREAM_INVERT) {
} else if (pInfo->pUpdateRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return upRes;
return pInfo->pUpdateRes;
}
}
}
......@@ -1044,6 +1043,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pInfo->tableUid = pScanPhyNode->uid;
pInfo->streamBlockReader = pHandle->reader;
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pUpdateRes = createResDataBlock(pDescNode);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
......
......@@ -23,7 +23,6 @@ typedef enum SResultTsInterpType {
RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator);
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
......@@ -778,6 +777,22 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated
return TSDB_CODE_SUCCESS;
}
static void removeResult(SArray* pUpdated, TSKEY key) {
int32_t size = taosArrayGetSize(pUpdated);
int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey);
if (index >= 0 && key == getReskey(pUpdated, index)) {
taosArrayRemove(pUpdated, index);
}
}
static void removeResults(SArray* pWins, SArray* pUpdated) {
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
STimeWindow* pW = taosArrayGet(pWins, i);
removeResult(pUpdated, pW->skey);
}
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SArray* pUpdated) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
......@@ -1212,6 +1227,7 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, int
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
ASSERT(p1);
doClearWindowImpl(p1, pSup->pResultBuf, pBinfo, numOfOutput);
}
......@@ -1363,6 +1379,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pChildOp);
}
}
nodesDestroyNode(pInfo->pPhyNode);
}
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
......@@ -1485,69 +1502,6 @@ _error:
return NULL;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval,
int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo) {
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = *pInterval;
pInfo->twAggSup = *pTwAggSupp;
pInfo->primaryTsIndex = primaryTsSlotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
int32_t numOfChild = 8; // Todo(liuyao) get it from phy plan
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo));
for (int32_t i = 0; i < numOfChild; i++) {
SSDataBlock* chRes = createOneDataBlock(pResBlock, false);
SOperatorInfo* pChildOp = createIntervalOperatorInfo(NULL, pExprInfo, numOfCols, chRes, pInterval, primaryTsSlotId,
pTwAggSupp, pTaskInfo);
if (pChildOp && chRes) {
taosArrayPush(pInfo->pChildren, &pChildOp);
continue;
}
goto _error;
}
pOperator->name = "StreamFinalIntervalOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet =
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
......@@ -1913,12 +1867,12 @@ _error:
return NULL;
}
static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock,
int32_t tableGroupId, SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
int32_t step = 1;
bool ascScan = true;
TSKEY* tsCols = NULL;
......@@ -1929,7 +1883,7 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
} else {
return pUpdated;
return ;
}
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
......@@ -1946,13 +1900,13 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos);
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos,
nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResult(pResult, tableGroupId, pUpdated);
}
// window start(end) key interpolation
// disable it temporarily
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
// forwardRows);
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
......@@ -1962,7 +1916,6 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
break;
}
}
return pUpdated;
}
bool isFinalInterval(SStreamFinalIntervalOperatorInfo* pInfo) { return pInfo->pChildren != NULL; }
......@@ -2006,24 +1959,72 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra
}
}
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
taosHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
cleanupResultRowInfo(&pInfo->binfo.resultRowInfo);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 1);
}
static void clearUpdateDataBlock(SSDataBlock* pBlock) {
if (pBlock->info.rows <= 0) {
return;
}
blockDataCleanup(pBlock);
}
static void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) {
ASSERT(pDest->info.capacity >= pSource->info.rows);
clearUpdateDataBlock(pDest);
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0);
SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex);
// copy timestamp column
colDataAssign(pDestCol, pSourceCol, pSource->info.rows);
for (int32_t i = 1; i < pDest->info.numOfCols; i++) {
SColumnInfoData* pCol = taosArrayGet(pDest->pDataBlock, i);
colDataAppendNNULL(pCol, 0, pSource->info.rows);
}
pDest->info.rows = pSource->info.rows;
blockDataUpdateTsWindow(pDest, 0);
}
static int32_t getChildIndex(SSDataBlock* pBlock) {
// if (pBlock->info.type != STREAM_INVALID && pBlock->info.rows < 4) { // for test
// return pBlock->info.rows - 1;
// }
return 0;
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = NULL;
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
SArray* pClosed = taosArrayInit(4, POINTER_BYTES);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (isFinalInterval(pInfo) || pInfo->pUpdateRes->info.rows == 0) {
if (!isFinalInterval(pInfo)) {
// semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo);
}
return NULL;
}
// process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes;
}
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
return pInfo->binfo.pRes;
}
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
clearUpdateDataBlock(pInfo->pUpdateRes);
break;
}
......@@ -2033,31 +2034,149 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, pInfo->primaryTsIndex, pOperator->numOfExprs,
pBlock, pUpWins);
if (isFinalInterval(pInfo)) {
int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SIntervalAggOperatorInfo* pChildInfo = pChildOp->info;
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, pChildInfo->primaryTsIndex,
pChildOp->numOfExprs, pBlock, NULL);
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs,
pOperator->pTaskInfo);
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval,
pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL);
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId,
pOperator->numOfExprs, pOperator->pTaskInfo);
taosArrayDestroy(pUpWins);
continue;
}
removeResults(pUpWins, pUpdated);
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
taosArrayDestroy(pUpWins);
continue;
break;
}
if (isFinalInterval(pInfo)) {
int32_t chIndex = 1; // Todo(liuyao) get it from SSDataBlock
int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren);
// if chIndex + 1 - size > 0, add new child
for (int32_t i = 0; i < chIndex + 1 - size; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
if (!pChildOp) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
taosArrayPush(pInfo->pChildren, &pChildOp);
}
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
doStreamIntervalAgg(pChildOp);
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
}
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
}
if (isFinalInterval(pInfo)) {
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
&pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed,
pInfo->binfo.rowCellInfoOffset);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
pUpdated = doHashInterval(pOperator, pBlock, 0);
}
taosArrayDestroy(pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
pOperator->status = OP_RES_TO_RETURN;
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
if (pInfo->binfo.pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) {
return NULL;
}
// process the rest of the data
pOperator->status = OP_OPENED;
return pInfo->pUpdateRes;
}
return pInfo->binfo.pRes;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = (SInterval) {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision =
((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.winMap = NULL, };
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
pInfo->pChildren = NULL;
if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo));
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
if (pChildOp) {
taosArrayPush(pInfo->pChildren, &pChildOp);
continue;
}
goto _error;
}
}
// semi interval operator does not catch result
if (!isFinalInterval(pInfo)) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
}
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);\
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->pPhyNode = nodesCloneNode(pPhyNode);
pOperator->name = "StreamFinalIntervalOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL,
destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow,
NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
......
......@@ -67,6 +67,7 @@ bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf
int32_t leastSQRFunction(SqlFunctionCtx* pCtx);
int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx);
int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
......
......@@ -1118,7 +1118,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = leastSQRFunctionSetup,
.processFunc = leastSQRFunction,
.finalizeFunc = leastSQRFinalize,
.invertFunc = leastSQRInvertFunction,
.invertFunc = NULL,
.combineFunc = leastSQRCombine,
},
{
.name = "avg",
......
......@@ -1799,17 +1799,17 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
param[1][1] = (double)pInfo->num;
param[1][0] = param[0][1];
param[0][0] -= param[1][0] * (param[0][1] / param[1][1]);
param[0][2] -= param[1][2] * (param[0][1] / param[1][1]);
param[0][1] = 0;
param[1][2] -= param[0][2] * (param[1][0] / param[0][0]);
param[1][0] = 0;
param[0][2] /= param[0][0];
double param00 = param[0][0] - param[1][0] * (param[0][1] / param[1][1]);
double param02 = param[0][2] - param[1][2] * (param[0][1] / param[1][1]);
// param[0][1] = 0;
double param12 = param[1][2] - param02 * (param[1][0] / param00);
// param[1][0] = 0;
param02 /= param00;
param[1][2] /= param[1][1];
param12 /= param[1][1];
char buf[64] = {0};
size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param[0][2], param[1][2]);
size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param02, param12);
varDataSetLen(buf, len);
colDataAppend(pCol, currentRow, buf, pResInfo->isNullRes);
......@@ -1822,6 +1822,27 @@ int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SLeastSQRInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
double (*pDparam)[3] = pDBuf->matrix;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SLeastSQRInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
double (*pSparam)[3] = pSBuf->matrix;
for (int32_t i = 0; i < pSBuf->num; i++) {
pDparam[0][0] += pDBuf->startVal * pDBuf->startVal;
pDparam[0][1] += pDBuf->startVal;
pDBuf->startVal += pDBuf->stepVal;
}
pDparam[0][2] += pSparam[0][2] + pDBuf->num * pDBuf->stepVal * pSparam[1][2];
pDparam[1][2] += pSparam[1][2];
pDBuf->num += pSBuf->num;
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SPercentileInfo);
return true;
......
......@@ -2441,6 +2441,7 @@ static const char* jkValueLiteralSize = "LiteralSize";
static const char* jkValueLiteral = "Literal";
static const char* jkValueDuration = "Duration";
static const char* jkValueTranslate = "Translate";
static const char* jkValueNotReserved = "NotReserved";
static const char* jkValueDatum = "Datum";
static int32_t datumToJson(const void* pObj, SJson* pJson) {
......@@ -2513,6 +2514,9 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkValueTranslate, pNode->translate);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkValueNotReserved, pNode->notReserved);
}
if (TSDB_CODE_SUCCESS == code && pNode->translate) {
code = datumToJson(pNode, pJson);
}
......@@ -2634,6 +2638,9 @@ static int32_t jsonToValueNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkValueTranslate, &pNode->translate);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkValueNotReserved, &pNode->notReserved);
}
if (TSDB_CODE_SUCCESS == code && pNode->translate) {
code = jsonToDatum(pJson, pNode);
}
......
......@@ -651,3 +651,31 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages,
ps->loadBytes / (1024.0 * ps->loadPages));
}
void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
SArray** p = taosHashIterate(pBuf->groupSet, NULL);
while (p) {
size_t n = taosArrayGetSize(*p);
for (int32_t i = 0; i < n; ++i) {
SPageInfo* pi = taosArrayGetP(*p, i);
taosMemoryFreeClear(pi->pData);
taosMemoryFreeClear(pi);
}
taosArrayDestroy(*p);
p = taosHashIterate(pBuf->groupSet, p);
}
tdListEmpty(pBuf->lruList);
tdListEmpty(pBuf->freePgList);
taosArrayClear(pBuf->emptyDummyIdList);
taosArrayClear(pBuf->pFree);
taosHashClear(pBuf->groupSet);
taosHashClear(pBuf->all);
pBuf->numOfPages = 0; // all pages are in buffer in the first place
pBuf->totalBufSize = 0;
pBuf->allocateId = -1;
pBuf->fileSize = 0;
}
\ No newline at end of file
......@@ -72,6 +72,10 @@
./test.sh -f tsim/stream/basic2.sim
# ./test.sh -f tsim/stream/session0.sim
# ./test.sh -f tsim/stream/session1.sim
# ./test.sh -f tsim/stream/state0.sim
# ./test.sh -f tsim/stream/triggerInterval0.sim
# ./test.sh -f tsim/stream/triggerSession0.sim
# ---- transaction
./test.sh -f tsim/trans/lossdata1.sim
......
......@@ -23,89 +23,98 @@ sql insert into t1 values(1648791223001,10,2,3,1.1,2);
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
sql insert into t1 values(1648791243003,NULL,NULL,NULL,NULL,4);
sql insert into t1 values(1648791213002,NULL,NULL,NULL,NULL,5) (1648791233012,NULL,NULL,NULL,NULL,6);
$loop_count = 0
loop0:
sleep 300
sql select * from streamt order by s desc;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 3 then
print ======$data01
return -1
print ======data01=$data01
goto loop0
endi
if $data02 != 3 then
print ======$data02
return -1
print ======data02=$data02
goto loop0
endi
if $data03 != 3 then
print ======$data03
return -1
print ======data03=$data03
goto loop0
endi
if $data04 != 2.100000000 then
print ======$data04
print ======data04=$data04
return -1
endi
if $data05 != 0.000000000 then
print ======$data05
print ======data05=$data05
return -1
endi
if $data06 != 3 then
print ======$data05
print ======data06=$data06
return -1
endi
if $data07 != 2.100000000 then
print ======$data05
print ======data07=$data07
return -1
endi
if $data08 != 6 then
print ======$data05
print ======data08=$data08
return -1
endi
# row 1
if $data11 != 3 then
print ======$data11
return -1
print ======data11=$data11
goto loop0
endi
if $data12 != 10 then
print ======$data12
return -1
print ======data12=$data12
goto loop0
endi
if $data13 != 10 then
print ======$data13
return -1
print ======data13=$data13
goto loop0
endi
if $data14 != 1.100000000 then
print ======$data14
print ======data14=$data14
return -1
endi
if $data15 != 0.000000000 then
print ======$data15
print ======data15=$data15
return -1
endi
if $data16 != 10 then
print ======$data15
print ======data16=$data16
return -1
endi
if $data17 != 1.100000000 then
print ======$data17
print ======data17=$data17
return -1
endi
if $data18 != 5 then
print ======$data18
print ======data18=$data18
return -1
endi
......@@ -115,23 +124,31 @@ sql insert into t1 values(1648791233002,3,2,3,2.1,9);
sql insert into t1 values(1648791243003,4,2,3,3.1,10);
sql insert into t1 values(1648791213002,4,2,3,4.1,11) ;
sql insert into t1 values(1648791213002,4,2,3,4.1,12) (1648791223009,4,2,3,4.1,13);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt order by s desc ;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 7 then
print ======$data01
return -1
print =====data01=$data01
goto loop1
endi
if $data02 != 9 then
print ======$data02
return -1
print =====data02=$data02
goto loop1
endi
if $data03 != 4 then
print ======$data03
return -1
print =====data03=$data03
goto loop1
endi
if $data04 != 1.100000000 then
......
......@@ -20,21 +20,33 @@ sql create stream streams1 trigger at_once into streamt1 as select _wstartts,
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
$loop_count = 0
loop0:
sql select * from streamt1 order by c desc;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print ======$rows
return -1;
print =====rows=$rows
goto loop0
endi
sql insert into t1 values(1648791214000,1,2,3,1.0,3);
$loop_count = 0
loop00:
sql select * from streamt1 order by c desc;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print ======$rows
return -1;
print =====rows=$rows
goto loop00
endi
sql insert into t1 values(1648791213010,2,2,3,1.0,4);
......@@ -44,27 +56,25 @@ $loop_count = 0
loop1:
sql select * from streamt1 where c >=4 order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print ======$rows
print =====rows=$rows
goto loop1
return -1
endi
# row 0
if $data01 != 1 then
print ======$data01
return -1
print =====data01=$data01
goto loop1
endi
if $data02 != 1 then
print ======$data02
return -1
print =====data02=$data02
goto loop1
endi
if $data03 != 1 then
......@@ -151,11 +161,10 @@ endi
sql insert into t1 values(1648791213011,1,2,3,1.0,7);
loop2:
$loop_count = 0
loop2:
sql select * from streamt1 where c in (5,4,7) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
......@@ -163,57 +172,51 @@ endi
# row 2
if $data21 != 2 then
print ======$data21
print =====data21=$data21
goto loop2
return -1
endi
if $data22 != 2 then
print ======$data22
print =====data22=$data22
goto loop2
return -1
endi
if $data23 != 2 then
print ======$data23
goto loop2
return -1
endi
if $data24 != 1 then
print ======$data24
goto loop2
return -1
endi
if $data25 != 3 then
print ======$data25
goto loop2
return -1
endi
if $data26 != 7 then
print ======$data26
goto loop2
return -1
endi
sql insert into t1 values(1648791213011,1,2,3,1.0,8);
loop21:
$loop_count = 0
loop21:
sql select * from streamt1 where c in (5,4,8) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data26 != 8 then
print ======$data26
print =====data26=$data26
goto loop21
return -1
endi
......@@ -222,11 +225,10 @@ sql insert into t1 values(1648791213020,3,2,3,1.0,10);
sql insert into t1 values(1648791214000,1,2,3,1.0,11);
sql insert into t1 values(1648791213011,10,20,10,10.0,12);
loop3:
$loop_count = 0
loop3:
sql select * from streamt1 where c in (5,4,10,11,12) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
......@@ -234,112 +236,100 @@ endi
# row 2
if $data21 != 1 then
print ======$data21
print =====data21=$data21
goto loop3
return -1
endi
if $data22 != 1 then
print ======$data22
print =====data22=$data22
goto loop3
return -1
endi
if $data23 != 10 then
print ======$data23
goto loop3
return -1
endi
if $data24 != 10 then
print ======$data24
goto loop3
return -1
endi
if $data25 != 10 then
print ======$data25
goto loop3
return -1
endi
if $data26 != 12 then
print ======$data26
goto loop3
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
print =====data31=$data31
goto loop3
return -1
endi
if $data32 != 1 then
print ======$data32
print =====data32=$data32
goto loop3
return -1
endi
if $data33 != 3 then
print ======$data33
goto loop3
return -1
endi
if $data34 != 3 then
print ======$data34
goto loop3
return -1
endi
if $data35 != 3 then
print ======$data35
goto loop3
return -1
endi
if $data36 != 10 then
print ======$data36
goto loop3
return -1
endi
# row 4
if $data41 != 1 then
print ======$data41
print =====data41=$data41
goto loop3
return -1
endi
if $data42 != 1 then
print ======$data42
print =====data42=$data42
goto loop3
return -1
endi
if $data43 != 1 then
print ======$data43
goto loop3
return -1
endi
if $data44 != 1 then
print ======$data44
goto loop3
return -1
endi
if $data45 != 3 then
print ======$data45
goto loop3
return -1
endi
if $data46 != 11 then
print ======$data46
goto loop3
return -1
endi
......@@ -347,8 +337,8 @@ sql insert into t1 values(1648791213030,3,12,12,12.0,13);
sql insert into t1 values(1648791214040,1,13,13,13.0,14);
sql insert into t1 values(1648791213030,3,14,14,14.0,15) (1648791214020,15,15,15,15.0,16);
loop4:
$loop_count = 0
loop4:
sql select * from streamt1 where c in (14,15,16) order by `_wstartts`;
sleep 300
......@@ -358,119 +348,104 @@ if $loop_count == 10 then
endi
if $rows != 3 then
print ======$rows
goto loop4
return -1;
print ====loop4=rows=$rows
# goto loop4
endi
# row 0
if $data01 != 2 then
print ======$data01
print =====data01=$data01
goto loop4
return -1
endi
if $data02 != 2 then
print ======$data02
goto loop4
return -1
endi
if $data03 != 6 then
print ======$data03
goto loop4
return -1
endi
if $data04 != 3 then
print ======$data04
goto loop4
return -1
endi
if $data05 != 3 then
print ======$data05
goto loop4
return -1
endi
if $data06 != 15 then
print ======$data06
goto loop4
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
print =====data11=$data11
goto loop4
return -1
endi
if $data12 != 1 then
print ======$data12
print =====data12=$data12
goto loop4
return -1
endi
if $data13 != 15 then
print ======$data13
goto loop4
return -1
endi
if $data14 != 15 then
print ======$data14
goto loop4
return -1
endi
if $data15 != 15 then
print ======$data15
goto loop4
return -1
endi
if $data16 != 16 then
print ======$data16
goto loop4
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
print =====data21=$data21
goto loop4
return -1
endi
if $data22 != 1 then
print ======$data22
print =====data22=$data22
goto loop4
return -1
endi
if $data23 != 1 then
print ======$data23
goto loop4
return -1
endi
if $data24 != 1 then
print ======$data24
goto loop4
return -1
endi
if $data25 != 13 then
print ======$data25
goto loop4
return -1
endi
if $data26 != 14 then
print ======$data26
goto loop4
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册