提交 0778bf88 编写于 作者: 5 54liuyao

stream session operator

上级 ef461a95
......@@ -39,6 +39,7 @@ typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInf
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
typedef struct SScalarFuncExecFuncs {
FExecGetEnv getEnv;
......@@ -50,6 +51,7 @@ typedef struct SFuncExecFuncs {
FExecInit init;
FExecProcess process;
FExecFinalize finalize;
FExecCombine combine;
} SFuncExecFuncs;
typedef struct SFileBlockInfo {
......
......@@ -212,6 +212,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
QUERY_NODE_PHYSICAL_PLAN_PARTITION,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
......
......@@ -296,6 +296,8 @@ typedef struct SSessionWinodwPhysiNode {
int64_t gap;
} SSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamSessionWinodwPhysiNode;
typedef struct SStateWinodwPhysiNode {
SWinodwPhysiNode window;
SNode* pStateKey;
......
......@@ -361,6 +361,18 @@ typedef struct SCatchSupporter {
int64_t* pKeyBuf;
} SCatchSupporter;
typedef struct SStreamAggSupporter {
SArray* pResultRows; // SResultWindowInfo
int32_t keySize;
char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SStreamAggSupporter;
typedef struct SessionWindowSupporter {
SStreamAggSupporter* pStreamAggSup;
int64_t gap;
} SessionWindowSupporter;
typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
......@@ -385,6 +397,7 @@ typedef struct SStreamBlockScanInfo {
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SCatchSupporter childAggSup;
SArray* childIds;
SessionWindowSupporter sessionSup;
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
......@@ -550,6 +563,27 @@ typedef struct SSessionAggOperatorInfo {
STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;
typedef struct SResultWindowInfo {
SResultRowPosition pos;
STimeWindow win;
bool isOutput;
} SResultWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SGroupResInfo groupResInfo;
int64_t gap; // session window gap
int32_t primaryTsIndex; // primary timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes;
SHashObj* pStDeleted;
void* pDelIterator;
} SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo {
SOptrBasicInfo binfo;
SInterval interval;
......@@ -727,6 +761,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
#endif
......@@ -761,13 +798,19 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn,
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize,
const char* pKey, const char* pDir);
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
const char* pDir);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap,
int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
#ifdef __cplusplus
}
#endif
......
......@@ -98,7 +98,6 @@ static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
......@@ -937,7 +936,7 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
return TSDB_CODE_SUCCESS;
}
static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
// in case of timestamp column, always generated results.
......@@ -4660,6 +4659,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType};
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr =
createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
......@@ -5151,15 +5163,37 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize, const char* pKey,
const char* pDir) {
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
const char* pDir) {
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
int32_t pageSize = rowSize * 32;
int32_t bufSize = pageSize * 4096;
createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);
;
return TSDB_CODE_SUCCESS;
if (pCatchSup->pKeyBuf == NULL || pCatchSup->pWindowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t pageSize = rowSize * 32;
int32_t bufSize = pageSize * 4096;
return createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir);
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
pSup->pResultRows = taosArrayInit(1024, sizeof(SResultWindowInfo));
if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t pageSize = 4096;
while (pageSize < pSup->resultRowSize * 4) {
pageSize <<= 1u;
}
// at least four pages need to be in buffer
int32_t bufSize = 4096 * 256;
if (bufSize <= pageSize) {
bufSize = pageSize * 4;
}
return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, "/tmp/");
}
......@@ -645,6 +645,10 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists);
}
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.pStreamAggSup != NULL;
}
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SSDataBlock* pSDB = pInfo->pUpdateRes;
if (pInfo->updateResIndex < pSDB->info.rows) {
......@@ -652,13 +656,25 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
pInfo->interval.precision, NULL);
STimeWindow win;
if (isSessionWindow(pInfo)) {
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows,
tsCols[pInfo->updateResIndex], gap, &winIndex);
win = pCurWin->win;
pInfo->updateResIndex += updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows,
pInfo->updateResIndex, gap, NULL);
} else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex],
&pInfo->interval, pInfo->interval.precision, NULL);
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
}
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
pTableScanInfo->cond.twindow = win;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex,
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
pTableScanInfo->scanTimes = 0;
return true;
} else {
......@@ -848,6 +864,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
blockDataCleanup(pInfo->pRes);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
prepareDataScan(pInfo);
return pInfo->pUpdateRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo);
......@@ -924,13 +941,12 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if (rows == 0) {
pOperator->status = OP_EXEC_DONE;
} else if (pInfo->interval.interval > 0) {
} else if (pInfo->pUpdateInfo) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
if (upRes) {
pInfo->pUpdateRes = upRes;
if (upRes->info.type == STREAM_REPROCESS) {
pInfo->updateResIndex = 0;
prepareDataScan(pInfo);
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (upRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
......@@ -1001,10 +1017,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pOperatorDumy;
pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
size_t childKeyBufSize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
initCatchSupporter(&pInfo->childAggSup, 1024, childKeyBufSize,
"StreamFinalInterval", TD_TMP_DIR_PATH); // TODO(liuyao) get row size from phy plan
initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
......@@ -9,6 +9,7 @@ typedef enum SResultTsInterpType {
} SResultTsInterpType;
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator);
static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator);
/*
* There are two cases to handle:
......@@ -1039,13 +1040,9 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
}
}
void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
int16_t bytes, uint64_t groupId, int32_t numOfOutput) {
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(bytes));
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf,
SOptrBasicInfo* pBinfo, int32_t numOfOutput) {
SResultRow* pResult = getResultRowByPos(pResultBuf, p1);
SqlFunctionCtx* pCtx = pBinfo->pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, pBinfo->rowCellInfoOffset);
......@@ -1060,6 +1057,15 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
}
}
void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
int16_t bytes, uint64_t groupId, int32_t numOfOutput) {
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
GET_RES_WINDOW_KEY_LEN(bytes));
doClearWindowImpl(p1, pSup->pResultBuf, pBinfo, numOfOutput);
}
static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
......@@ -1112,8 +1118,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0,
pOperator->numOfExprs, pBlock);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue;
}
......@@ -1644,9 +1650,10 @@ _error:
return NULL;
}
static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock,
int32_t tableGroupId) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
......@@ -1659,7 +1666,10 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRes
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
} else {
return pUpdated;
}
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan);
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts,
......@@ -1720,7 +1730,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
continue;
}
pUpdated = doHashInterval(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
pUpdated = doHashInterval(pOperator, pBlock, 0);
}
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
......@@ -1730,3 +1740,534 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
taosArrayDestroy(pSup->pResultRows);
taosMemoryFreeClear(pSup->pKeyBuf);
destroyDiskbasedBuf(pSup->pResultBuf);
}
void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
destroyStreamAggSupporter(&pInfo->streamAggSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
}
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResultBlock, SDiskbasedBuf* pResultBuf) {
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
pBasicInfo->pRes = pResultBlock;
for (int32_t i = 0; i < numOfCols; ++i) {
pBasicInfo->pCtx[i].pBuf = pResultBuf;
}
return TSDB_CODE_SUCCESS;
}
void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t nums) {
for (int i = 0; i < nums; i++) {
pDummy[i].functionId = pCtx[i].functionId;
}
}
void initDownStream(SOperatorInfo* downstream, SStreamSessionAggOperatorInfo* pInfo) {
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamBlockScanInfo* pScanInfo = downstream->info;
pScanInfo->sessionSup =
(SessionWindowSupporter){.pStreamAggSup = &pInfo->streamAggSup, .gap = pInfo->gap};
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 60000 * 60 * 6);
}
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SStreamSessionAggOperatorInfo* pInfo =
taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
initResultSizeInfo(pOperator, 4096);
int32_t code = initStreamAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo");
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock,
pInfo->streamAggSup.pResultBuf);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
pInfo->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfCols, sizeof(SqlFunctionCtx));
if (pInfo->pDummyCtx == NULL) {
goto _error;
}
initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols);
pInfo->twAggSup = *pTwAggSupp;
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = tsSlotId;
pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock;
pInfo->order = TSDB_ORDER_ASC;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL;
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
blockDataEnsureCapacity(pInfo->pDelRes, 64);
pOperator->name = "StreamSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionWindowAgg,
NULL, NULL, destroyStreamSessionAggOperatorInfo, aggEncodeResultRow,
aggDecodeResultRow, NULL);
pOperator->pTaskInfo = pTaskInfo;
initDownStream(downstream, pInfo);
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo, numOfCols);
}
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
int32_t binarySearch(void* keyList, int num, TSKEY key, int order,
__get_value_fn_t getValuefn) {
int firstPos = 0, lastPos = num - 1, midPos = -1;
int numOfRows = 0;
if (num <= 0) return -1;
if (order == TSDB_ORDER_DESC) {
// find the first position which is smaller than the key
while (1) {
if (key >= getValuefn(keyList, lastPos)) return lastPos;
if (key == getValuefn(keyList, firstPos)) return firstPos;
if (key < getValuefn(keyList, firstPos)) return firstPos - 1;
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < getValuefn(keyList, midPos)) {
lastPos = midPos - 1;
} else if (key > getValuefn(keyList, midPos)) {
firstPos = midPos + 1;
} else {
break;
}
}
} else {
// find the first position which is bigger than the key
while (1) {
if (key <= getValuefn(keyList, firstPos)) return firstPos;
if (key == getValuefn(keyList, lastPos)) return lastPos;
if (key > getValuefn(keyList, lastPos)) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
else
return lastPos;
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < getValuefn(keyList, midPos)) {
lastPos = midPos - 1;
} else if (key > getValuefn(keyList, midPos)) {
firstPos = midPos + 1;
} else {
break;
}
}
}
return midPos;
}
int64_t getSessionWindowEndkey(void* data, int32_t index) {
SArray* pWinInfos = (SArray*) data;
SResultWindowInfo* pWin = taosArrayGet(pWinInfos, index);
return pWin->win.ekey;
}
static bool isInWindow(SResultWindowInfo* pWin, TSKEY ts, int64_t gap) {
int64_t sGap = ts - pWin->win.skey;
int64_t eGap = pWin->win.ekey - ts;
if ( (sGap < 0 && sGap >= -gap) || (eGap < 0 && eGap >= -gap) || (sGap >= 0 && eGap >= 0) ) {
return true;
}
return false;
}
static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY ts,
int32_t index) {
SResultWindowInfo win =
{.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false};
return taosArrayInsert(pWinInfos, index, &win);
}
static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) {
SResultWindowInfo win =
{.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false};
return taosArrayPush(pWinInfos, &win);
}
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap,
int32_t* pIndex) {
int32_t size = taosArrayGetSize(pWinInfos);
if (size == 0) {
return addNewSessionWindow(pWinInfos, ts);
}
// find the first position which is smaller than the key
int32_t index = binarySearch(pWinInfos, size, ts, TSDB_ORDER_DESC,
getSessionWindowEndkey);
SResultWindowInfo* pWin = NULL;
if (index >= 0) {
pWin = taosArrayGet(pWinInfos, index);
if (isInWindow(pWin, ts, gap)) {
*pIndex = index;
return pWin;
}
}
if (index + 1 < size) {
pWin = taosArrayGet(pWinInfos, index + 1);
if (isInWindow(pWin, ts, gap)) {
*pIndex = index + 1;
return pWin;
}
}
if (index == size - 1) {
*pIndex = taosArrayGetSize(pWinInfos);
return addNewSessionWindow(pWinInfos, ts);
}
*pIndex = index;
return insertNewSessionWindow(pWinInfos, ts, index);
}
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted) {
for (int32_t i = start; i < rows; ++i) {
if (!isInWindow(pWinInfo, pTs[i], gap)) {
return i - start;
}
if (pWinInfo->win.skey > pTs[i]) {
if (pStDeleted && pWinInfo->isOutput) {
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
pWinInfo->isOutput = false;
}
pWinInfo->win.skey = pTs[i];
}
pWinInfo->win.ekey = TMAX(pWinInfo->win.ekey, pTs[i]);
}
return rows - start;
}
static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult,
SqlFunctionCtx* pCtx, int32_t groupId, int32_t numOfOutput,
int32_t* rowCellInfoOffset, SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) {
assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
// too many time window in query
int32_t size = taosArrayGetSize(pAggSup->pResultRows);
if (size > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
}
if (pWinInfo->pos.pageId == -1) {
*pResult = getNewResultRow_rv(pAggSup->pResultBuf, groupId, pAggSup->resultRowSize);
if (*pResult == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
initResultRow(*pResult);
// add a new result set for a new group
pWinInfo->pos.pageId = (*pResult)->pageId;
pWinInfo->pos.offset = (*pResult)->offset;
} else {
*pResult = getResultRowByPos(pAggSup->pResultBuf, &pWinInfo->pos);
if (!(*pResult)) {
qError("getResultRowByPos return NULL, TID:%s", GET_TASKID(pTaskInfo));
return TSDB_CODE_FAILED;
}
}
// set time window for current result
(*pResult)->win = pWinInfo->win;
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
static int32_t doOneWindowAgg(SStreamSessionAggOperatorInfo* pInfo,
SSDataBlock* pSDataBlock, SResultWindowInfo* pCurWin, SResultRow** pResult,
int32_t startIndex, int32_t winRows, int32_t numOutput, SExecTaskInfo* pTaskInfo ) {
SColumnInfoData* pColDataInfo =
taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
int32_t code = setWindowOutputBuf(pCurWin, pResult, pInfo->binfo.pCtx, pSDataBlock->info.groupId,
numOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->win, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pCurWin->win,
&pInfo->twAggSup.timeWindowData, startIndex, winRows, tsCols, pSDataBlock->info.rows,
numOutput, TSDB_ORDER_ASC);
return TSDB_CODE_SUCCESS;
}
int32_t copyWinInfoToDataBlock(SSDataBlock* pBlock, SStreamAggSupporter* pAggSup,
int32_t start, int32_t num, int32_t numOfExprs, SOptrBasicInfo* pBinfo) {
for (int32_t i = start; i < num; i += 1) {
SResultWindowInfo* pWinInfo = taosArrayGet(pAggSup->pResultRows, start);
SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pWinInfo->pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pWinInfo->pos.offset);
for (int32_t j = 0; j < numOfExprs; ++j) {
SResultRowEntryInfo* pResultInfo = getResultCell(pRow, j, pBinfo->rowCellInfoOffset);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
char* in = GET_ROWCELL_INTERBUF(pBinfo->pCtx[j].resultInfo);
colDataAppend(pColInfoData, pBlock->info.rows, in, pResultInfo->isNullRes);
}
pBlock->info.rows += pRow->numOfRows;
releaseBufPage(pAggSup->pResultBuf, bufPage);
}
blockDataUpdateTsWindow(pBlock, -1);
return TSDB_CODE_SUCCESS;
}
int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap) {
SResultWindowInfo* pCurWin = taosArrayGet(pWinInfos, startIndex);
int32_t size = taosArrayGetSize(pWinInfos);
// Just look for the window behind StartIndex
for (int32_t i = startIndex + 1; i < size; i++) {
SResultWindowInfo* pWinInfo = taosArrayGet(pWinInfos, i);
if (!isInWindow(pCurWin, pWinInfo->win.skey, gap)) {
return i - startIndex - 1;
}
}
return size - startIndex - 1;
}
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx,
int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
for (int32_t k = 0; k < numOfOutput; ++k) {
if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
continue;
}
int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
longjmp(pTaskInfo->env, code);
}
}
}
}
void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t num,
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) {
SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pResultRows, startIndex);
SResultRow* pCurResult = NULL;
setWindowOutputBuf(pCurWin, &pCurResult, pInfo->binfo.pCtx, groupId,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo);
num += startIndex + 1;
ASSERT(num <= taosArrayGetSize(pInfo->streamAggSup.pResultRows));
// Just look for the window behind StartIndex
for (int32_t i = startIndex + 1; i < num; i++) {
SResultWindowInfo* pWinInfo = taosArrayGet(pInfo->streamAggSup.pResultRows, i);
SResultRow* pWinResult = NULL;
setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->streamAggSup, pTaskInfo);
pCurWin->win.ekey = TMAX(pCurWin->win.ekey, pWinInfo->win.ekey);
compactFunctions(pInfo->binfo.pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo);
taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition));
if (pWinInfo->isOutput) {
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
pWinInfo->isOutput = false;
}
taosArrayRemove(pInfo->streamAggSup.pResultRows, i);
}
}
static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator,
SSDataBlock* pSDataBlock, SHashObj* pStUpdated, SHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
bool masterScan = true;
int32_t numOfOutput = pOperator->numOfExprs;
int64_t groupId = pSDataBlock->info.groupId;
int64_t gap = pInfo->gap;
int64_t code = TSDB_CODE_SUCCESS;
int32_t step = 1;
bool ascScan = true;
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t winRows = 0;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo =
taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
} else {
return ;
}
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for(int32_t i = 0; i < pSDataBlock->info.rows; ) {
int32_t winIndex = 0;
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex);
winRows =
updateSessionWindowInfo(pCurWin, tsCols, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// window start(end) key interpolation
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
// pInfo->order, false);
int32_t winNum = getNumCompactWindow(pAggSup->pResultRows, winIndex, gap);
if (winNum > 0) {
compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pTaskInfo, pStUpdated, pStDeleted);
}
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &(pCurWin->win.skey), sizeof(TSKEY));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pCurWin->isOutput = true;
i += winRows;
}
}
static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo* pBinfo,
SSDataBlock* pBlock, int32_t tsIndex, int32_t numOfOutput, int64_t gap) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0;
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup->pResultRows, tsCols[i], gap, &winIndex);
step = updateSessionWindowInfo(pCurWin, tsCols, pBlock->info.rows, i, gap, NULL);
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pBinfo, numOfOutput);
}
}
static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t groupId) {
void* pData = NULL;
size_t keyLen = 0;
while((pData = taosHashIterate(pStUpdated, pData)) != NULL) {
void* key = taosHashGetKey(pData, &keyLen);
ASSERT(keyLen == sizeof(SResultRowPosition));
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (pos == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
pos->groupId = groupId;
pos->pos = *(SResultRowPosition*)key;
*(int64_t*)pos->key = *(uint64_t*)pData;
taosArrayPush(pUpdated, &pos);
}
return TSDB_CODE_SUCCESS;
}
void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
blockDataCleanup(pBlock);
size_t keyLen = 0;
while(( (*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
colDataAppend(pColInfoData, pBlock->info.rows, *Ite, false);
for (int32_t i = 1; i < pBlock->info.numOfCols; i++) {
pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
colDataAppendNULL(pColInfoData, pBlock->info.rows);
}
pBlock->info.rows += 1;
if (pBlock->info.rows + 1 >= pBlock->info.capacity) {
break;
}
}
if ((*Ite) == NULL) {
taosHashClear(pStDeleted);
}
}
static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo,
pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 ||
!hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
if (pBlock->info.type == STREAM_REPROCESS) {
doClearSessionWindows(&pInfo->streamAggSup, &pInfo->binfo, pBlock, 0,
pOperator->numOfExprs, pInfo->gap);
continue;
}
doStreamSessionWindowAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted);
}
// restore the value
pOperator->status = OP_RES_TO_RETURN;
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId);
taosHashCleanup(pStUpdated);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pInfo->binfo.rowCellInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo,
pInfo->streamAggSup.pResultBuf);
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
......@@ -37,6 +37,7 @@ typedef struct SBuiltinFuncDefinition {
FScalarExecProcess sprocessFunc;
FExecFinalize finalizeFunc;
FExecProcess invertFunc;
FExecCombine combineFunc;
} SBuiltinFuncDefinition;
extern const SBuiltinFuncDefinition funcMgtBuiltins[];
......
......@@ -27,6 +27,7 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx));
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......@@ -37,24 +38,29 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t sumFunction(SqlFunctionCtx *pCtx);
int32_t sumInvertFunction(SqlFunctionCtx *pCtx);
int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool minmaxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t minFunction(SqlFunctionCtx* pCtx);
int32_t maxFunction(SqlFunctionCtx *pCtx);
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t minCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t avgFunction(SqlFunctionCtx* pCtx);
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t avgInvertFunction(SqlFunctionCtx* pCtx);
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stddevFunction(SqlFunctionCtx* pCtx);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx);
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
......@@ -73,8 +79,10 @@ int32_t diffFunction(SqlFunctionCtx *pCtx);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastFunction(SqlFunctionCtx *pCtx);
int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx);
......
......@@ -745,7 +745,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = countFunction,
.finalizeFunc = functionFinalize,
.invertFunc = countInvertFunction
.invertFunc = countInvertFunction,
.combineFunc = combineFunction,
},
{
.name = "sum",
......@@ -757,7 +758,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup,
.processFunc = sumFunction,
.finalizeFunc = functionFinalize,
.invertFunc = sumInvertFunction
.invertFunc = sumInvertFunction,
.combineFunc = sumCombine,
},
{
.name = "min",
......@@ -768,7 +770,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = minmaxFunctionSetup,
.processFunc = minFunction,
.finalizeFunc = minmaxFunctionFinalize
.finalizeFunc = minmaxFunctionFinalize,
.combineFunc = minCombine
},
{
.name = "max",
......@@ -779,7 +782,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = minmaxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = minmaxFunctionFinalize
.finalizeFunc = minmaxFunctionFinalize,
.combineFunc = maxCombine
},
{
.name = "stddev",
......@@ -790,7 +794,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = stddevFunctionSetup,
.processFunc = stddevFunction,
.finalizeFunc = stddevFinalize,
.invertFunc = stddevInvertFunction
.invertFunc = stddevInvertFunction,
.combineFunc = stddevCombine,
},
{
.name = "leastsquares",
......@@ -801,7 +806,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = leastSQRFunctionSetup,
.processFunc = leastSQRFunction,
.finalizeFunc = leastSQRFinalize,
.invertFunc = leastSQRInvertFunction
.invertFunc = leastSQRInvertFunction,
},
{
.name = "avg",
......@@ -812,7 +817,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = avgFunctionSetup,
.processFunc = avgFunction,
.finalizeFunc = avgFinalize,
.invertFunc = avgInvertFunction
.invertFunc = avgInvertFunction,
.combineFunc = avgCombine,
},
{
.name = "percentile",
......@@ -894,7 +900,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = firstFunction,
.finalizeFunc = functionFinalize
.finalizeFunc = functionFinalize,
.combineFunc = firstCombine,
},
{
.name = "last",
......@@ -904,7 +911,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunction,
.finalizeFunc = lastFinalize
.finalizeFunc = lastFinalize,
.combineFunc = lastCombine,
},
{
.name = "histogram",
......
......@@ -292,6 +292,24 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (pSResInfo->numOfRes != 0 &&
(pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) > *(TSKEY*)(pSBuf + bytes)) ) {
memcpy(pDBuf, pSBuf, bytes);
*(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes);
pDResInfo->numOfRes = 1;
}
return TSDB_CODE_SUCCESS;
}
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) {
return 0;
}
......@@ -388,6 +406,18 @@ int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
*((int64_t*)pDBuf) += *((int64_t*)pSBuf);
SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1);
return TSDB_CODE_SUCCESS;
}
#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \
do { \
_t* d = (_t*)(_col->pData); \
......@@ -537,6 +567,26 @@ int32_t sumInvertFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SSumRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SSumRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
pDBuf->isum += pSBuf->isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pDBuf->usum += pSBuf->usum;
} else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
pDBuf->dsum += pSBuf->dsum;
}
SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1);
return TSDB_CODE_SUCCESS;
}
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SSumRes);
return true;
......@@ -738,6 +788,24 @@ int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SAvgRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (IS_INTEGER_TYPE(type)) {
pDBuf->sum.isum += pSBuf->sum.isum;
} else {
pDBuf->sum.dsum += pSBuf->sum.dsum;
}
pDBuf->count += pSBuf->count;
return TSDB_CODE_SUCCESS;
}
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
......@@ -1273,6 +1341,34 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
}
}
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SMinmaxResInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (IS_FLOAT_TYPE(type)) {
if (pSBuf->assign &&
( (((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign ) ) {
*(double*) &pDBuf->v = *(double*) &pSBuf->v;
}
} else {
if ( pSBuf->assign && ( ((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign ) ) {
pDBuf->v = pSBuf->v;
}
}
SET_VAL(pDResInfo, *((int64_t*)pDBuf), 1);
return TSDB_CODE_SUCCESS;
}
int32_t minCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return minMaxCombine(pDestCtx, pSourceCtx, 1);
}
int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return minMaxCombine(pDestCtx, pSourceCtx, 0);
}
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SStddevRes);
return true;
......@@ -1491,6 +1587,25 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return functionFinalize(pCtx, pBlock);
}
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SStddevRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SStddevRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (IS_INTEGER_TYPE(type)) {
pDBuf->isum += pSBuf->isum;
pDBuf->quadraticISum += pSBuf->quadraticISum;
} else {
pDBuf->dsum += pSBuf->dsum;
pDBuf->quadraticDSum += pSBuf->quadraticDSum;
}
pDBuf->count += pSBuf->count;
return TSDB_CODE_SUCCESS;
}
bool getLeastSQRFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SLeastSQRInfo);
return true;
......@@ -1979,6 +2094,24 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
char* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (pSResInfo->numOfRes != 0 &&
(pDResInfo->numOfRes == 0 || *(TSKEY*)(pDBuf + bytes) < *(TSKEY*)(pSBuf + bytes)) ) {
memcpy(pDBuf, pSBuf, bytes);
*(TSKEY*)(pDBuf + bytes) = *(TSKEY*)(pSBuf + bytes);
pDResInfo->numOfRes = 1;
}
return TSDB_CODE_SUCCESS;
}
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDiffInfo);
return true;
......
......@@ -118,6 +118,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
pFpSet->init = funcMgtBuiltins[funcId].initFunc;
pFpSet->process = funcMgtBuiltins[funcId].processFunc;
pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
return TSDB_CODE_SUCCESS;
}
......
......@@ -230,6 +230,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiFill";
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return "PhysiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
return "PhysiStreamSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return "PhysiStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
......@@ -2528,6 +2530,29 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkSessionWindowTsPrimaryKey = "TsPrimaryKey";
static const char* jkSessionWindowGap = "Gap";
static int32_t sessionWindowNodeToJson(const void* pObj, SJson* pJson) {
const SSessionWindowNode * pNode = (const SSessionWindowNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkSessionWindowTsPrimaryKey, nodeToJson, pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSessionWindowGap, nodeToJson, pNode->pGap);
}
return code;
}
static int32_t jsonToSessionWindowNode(const SJson* pJson, void* pObj) {
SSessionWindowNode* pNode = (SSessionWindowNode*)pObj;
int32_t code = jsonToNodeObject(pJson, jkSessionWindowTsPrimaryKey, (SNode **)&pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSessionWindowGap, (SNode **)&pNode->pGap);
}
return code;
}
static const char* jkIntervalWindowInterval = "Interval";
static const char* jkIntervalWindowOffset = "Offset";
static const char* jkIntervalWindowSliding = "Sliding";
......@@ -3015,8 +3040,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return orderByExprNodeToJson(pObj, pJson);
case QUERY_NODE_LIMIT:
case QUERY_NODE_STATE_WINDOW:
case QUERY_NODE_SESSION_WINDOW:
break;
case QUERY_NODE_SESSION_WINDOW:
return sessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_INTERVAL_WINDOW:
return intervalWindowNodeToJson(pObj, pJson);
case QUERY_NODE_NODE_LIST:
......@@ -3096,6 +3122,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_FILL:
return physiFillNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
return physiSessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return physiStateWindowNodeToJson(pObj, pJson);
......@@ -3134,6 +3161,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToTempTableNode(pJson, pObj);
case QUERY_NODE_ORDER_BY_EXPR:
return jsonToOrderByExprNode(pJson, pObj);
case QUERY_NODE_SESSION_WINDOW:
return jsonToSessionWindowNode(pJson, pObj);
case QUERY_NODE_INTERVAL_WINDOW:
return jsonToIntervalWindowNode(pJson, pObj);
case QUERY_NODE_NODE_LIST:
......@@ -3196,6 +3225,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_FILL:
return jsonToPhysiFillNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
return jsonToPhysiSessionWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return jsonToPhysiStateWindowNode(pJson, pObj);
......
......@@ -517,6 +517,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: {
......
......@@ -251,6 +251,8 @@ int32_t nodesNodeSize(ENodeType type) {
return sizeof(SFillPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return sizeof(SSessionWinodwPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
return sizeof(SStreamSessionWinodwPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return sizeof(SStateWinodwPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
......@@ -664,6 +666,7 @@ void nodesDestroyNode(SNodeptr pNode) {
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
......
......@@ -945,7 +945,8 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW);
pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW : QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW));
if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -127,7 +127,10 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
if (pInfo->minTS < 0) {
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
}
uint64_t index = (uint64_t)((ts - pInfo->minTS) / pInfo->interval);
int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
if (index < 0) {
return NULL;
}
if (index >= pInfo->numSBFs) {
uint64_t count = index + 1 - pInfo->numSBFs;
windowSBfDelete(pInfo, count);
......
......@@ -67,6 +67,8 @@
# ---- stream
./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim
# ---- transaction
./test.sh -f tsim/trans/lossdata1.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
sql create stream streams2 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), max(a), min(d), stddev(a), last(a), first(d), max(id) s from t1 session(ts,10s);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL,1);
sql insert into t1 values(1648791223001,10,2,3,1.1,2);
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
sql insert into t1 values(1648791243003,NULL,NULL,NULL,NULL,4);
sql insert into t1 values(1648791213002,NULL,NULL,NULL,NULL,5) (1648791233012,NULL,NULL,NULL,NULL,6);
sql select * from streamt order by s desc;
# row 0
if $data01 != 3 then
print ======$data01
return -1
endi
if $data02 != 3 then
print ======$data02
return -1
endi
if $data03 != 3 then
print ======$data03
return -1
endi
if $data04 != 2.100000000 then
print ======$data04
return -1
endi
if $data05 != 0.000000000 then
print ======$data05
return -1
endi
if $data06 != 3 then
print ======$data05
return -1
endi
if $data07 != 2.100000000 then
print ======$data05
return -1
endi
if $data08 != 6 then
print ======$data05
return -1
endi
# row 1
if $data11 != 3 then
print ======$data01
return -1
endi
if $data12 != 10 then
print ======$data02
return -1
endi
if $data13 != 10 then
print ======$data03
return -1
endi
if $data14 != 1.100000000 then
print ======$data04
return -1
endi
if $data15 != 0.000000000 then
print ======$data05
return -1
endi
if $data16 != 10 then
print ======$data05
return -1
endi
if $data17 != 1.100000000 then
print ======$data05
return -1
endi
if $data18 != 5 then
print ======$data05
return -1
endi
sql insert into t1 values(1648791213000,1,2,3,1.0,7);
sql insert into t1 values(1648791223001,2,2,3,1.1,8);
sql insert into t1 values(1648791233002,3,2,3,2.1,9);
sql insert into t1 values(1648791243003,4,2,3,3.1,10);
sql insert into t1 values(1648791213002,4,2,3,4.1,11) ;
sql insert into t1 values(1648791213002,4,2,3,4.1,12) (1648791223009,4,2,3,4.1,13);
sql select * from streamt order by s desc ;
# row 0
if $data01 != 7 then
print ======$data01
return -1
endi
if $data02 != 9 then
print ======$data02
return -1
endi
if $data03 != 4 then
print ======$data03
return -1
endi
if $data04 != 1.100000000 then
print ======$data04
return -1
endi
if $data05 != 0.816496581 then
print ======$data05
return -1
endi
if $data06 != 3 then
print ======$data05
return -1
endi
if $data07 != 1.100000000 then
print ======$data05
return -1
endi
if $data08 != 13 then
print ======$data05
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);
sql create stream streams2 trigger at_once into streamt as select _wstartts, count(*) c1, sum(a), min(b), max(id) s from t1 session(ts,10s);
sql insert into t1 values(1648791210000,1,1,1,1.1,1);
sql insert into t1 values(1648791220000,2,2,2,2.1,2);
sql insert into t1 values(1648791230000,3,3,3,3.1,3);
sql insert into t1 values(1648791240000,4,4,4,4.1,4);
sql select * from streamt order by s desc;
# row 0
if $data01 != 4 then
print ======$data01
return -1
endi
if $data02 != 10 then
print ======$data02
return -1
endi
if $data03 != 1 then
print ======$data03
return -1
endi
if $data04 != 4 then
print ======$data04
return -1
endi
sql insert into t1 values(1648791250005,5,5,5,5.1,5);
sql insert into t1 values(1648791260006,6,6,6,6.1,6);
sql insert into t1 values(1648791270007,7,7,7,7.1,7);
sql insert into t1 values(1648791240005,5,5,5,5.1,8) (1648791250006,6,6,6,6.1,9);
sql select * from streamt order by s desc;
# row 0
if $data01 != 8 then
print ======$data01
return -1
endi
if $data02 != 32 then
print ======$data02
return -1
endi
if $data03 != 1 then
print ======$data03
return -1
endi
if $data04 != 9 then
print ======$data04
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 7 then
print ======$data12
return -1
endi
if $data13 != 7 then
print ======$data13
return -1
endi
if $data14 != 7 then
print ======$data14
return -1
endi
sql insert into t1 values(1648791280008,7,7,7,7.1,10) (1648791300009,8,8,8,8.1,11);
sql insert into t1 values(1648791260007,7,7,7,7.1,12) (1648791290008,7,7,7,7.1,13) (1648791290009,8,8,8,8.1,14);
sql insert into t1 values(1648791500000,7,7,7,7.1,15) (1648791520000,8,8,8,8.1,16) (1648791540000,8,8,8,8.1,17);
sql insert into t1 values(1648791530000,8,8,8,8.1,18);
sql insert into t1 values(1648791220000,10,10,10,10.1,19) (1648791290008,2,2,2,2.1,20) (1648791540000,17,17,17,17.1,21) (1648791500001,22,22,22,22.1,22);
sql select * from streamt order by s desc;
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 29 then
print ======$data02
return -1
endi
if $data03 != 7 then
print ======$data03
return -1
endi
if $data04 != 22 then
print ======$data04
return -1
endi
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 33 then
print ======$data12
return -1
endi
if $data13 != 8 then
print ======$data13
return -1
endi
if $data14 != 21 then
print ======$data14
return -1
endi
# row 2
if $data21 != 4 then
print ======$data21
return -1
endi
if $data22 != 25 then
print ======$data22
return -1
endi
if $data23 != 2 then
print ======$data23
return -1
endi
if $data24 != 20 then
print ======$data24
return -1
endi
# row 3
if $data31 != 10 then
print ======$data31
return -1
endi
if $data32 != 54 then
print ======$data32
return -1
endi
if $data33 != 1 then
print ======$data33
return -1
endi
if $data34 != 19 then
print ======$data34
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册