提交 9a5123c6 编写于 作者: H Haojun Liao

[td-13039] add times for session window

上级 05038ded
...@@ -183,11 +183,11 @@ typedef struct SqlFunctionCtx { ...@@ -183,11 +183,11 @@ typedef struct SqlFunctionCtx {
int32_t columnIndex; // TODO remove it int32_t columnIndex; // TODO remove it
uint8_t currentStage; // record current running step, default: 0 uint8_t currentStage; // record current running step, default: 0
bool isAggSet; bool isAggSet;
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
bool stableQuery; bool stableQuery;
int16_t functionId; // function id int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data char * pOutput; // final result output buffer, point to sdata->data
int64_t startTs; // timestamp range of current query when function is executed on a specific data block
int32_t numOfParams; int32_t numOfParams;
SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list int64_t *ptsList; // corresponding timestamp array list
......
...@@ -550,15 +550,16 @@ typedef struct SGroupbyOperatorInfo { ...@@ -550,15 +550,16 @@ typedef struct SGroupbyOperatorInfo {
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
STimeWindow curWindow; // current time window STimeWindow curWindow; // current time window
TSKEY prevTs; // previous timestamp TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows int32_t numOfRows; // number of rows
int32_t start; // start row index int32_t start; // start row index
bool reptScan; // next round scan bool reptScan; // next round scan
int64_t gap; // session window gap int64_t gap; // session window gap
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo { typedef struct SStateWindowOperatorInfo {
......
...@@ -1024,29 +1024,25 @@ static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQuer ...@@ -1024,29 +1024,25 @@ static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQuer
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
} }
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin) { static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
int64_t* ts = (int64_t*)pColData->pData; int64_t* ts = (int64_t*)pColData->pData;
int32_t delta = includeEndpoint? 1:0;
int64_t duration = pWin->ekey - pWin->skey + 1; int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + 1; // window end key ts[4] = pWin->ekey + delta; // window end key
} }
static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
int32_t numOfTotal, int32_t numOfOutput, int32_t order) { int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
SScalarParam intervalParam = {.numOfRows = 5, .columnData = pTimeWindowData}; //TODO move out of this function
if (pTimeWindowData != NULL) {
updateTimeWindowInfo(pTimeWindowData, pWin);
}
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey; pCtx[k].startTs = pWin->skey;
// keep it temporarialy // keep it temporarialy
bool hasAgg = pCtx[k].input.colDataAggIsSet; bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t startOffset = pCtx[k].input.startRowIndex; int32_t startOffset = pCtx[k].input.startRowIndex;
int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1); int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1);
pCtx[k].input.startRowIndex = pos; pCtx[k].input.startRowIndex = pos;
...@@ -1066,12 +1062,14 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInf ...@@ -1066,12 +1062,14 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInf
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo); char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
SScalarParam out = {.columnData = NULL}; SColumnInfoData idata = {0};
out.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData)); idata.info.type = TSDB_DATA_TYPE_BIGINT;
out.columnData->info.type = TSDB_DATA_TYPE_BIGINT; idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
out.columnData->info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; idata.pData = p;
out.columnData->pData = p;
pCtx[k].sfp.process(&intervalParam, 1, &out); SScalarParam out = {.columnData = &idata};
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
pCtx[k].sfp.process(&tw, 1, &out);
pEntryInfo->numOfRes = 1; pEntryInfo->numOfRes = 1;
pEntryInfo->hasResult = ','; pEntryInfo->hasResult = ',';
continue; continue;
...@@ -1617,8 +1615,9 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1617,8 +1615,9 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
#endif #endif
// window start key interpolation // window start key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false);
pInfo->order, false);
updateTimeWindowInfo(&pInfo->timeWindowData, &win, true);
doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
STimeWindow nextWin = win; STimeWindow nextWin = win;
...@@ -1647,8 +1646,9 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1647,8 +1646,9 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false);
pInfo->order, false);
updateTimeWindowInfo(&pInfo->timeWindowData, &win, true);
doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
...@@ -1915,73 +1915,80 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -1915,73 +1915,80 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
} }
static void doKeepTuple(SSessionAggOperatorInfo* pInfo, int64_t ts) {
pInfo->curWindow.ekey = ts;
pInfo->prevTs = ts;
pInfo->numOfRows += 1;
}
static void doKeepSessionStartInfo(SSessionAggOperatorInfo* pInfo, const int64_t* tsList, int32_t rowIndex) {
pInfo->start = rowIndex;
pInfo->numOfRows = 0;
pInfo->curWindow.skey = tsList[rowIndex];
}
// todo handle multiple tables cases. // todo handle multiple tables cases.
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) { static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// primary timestamp column
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
bool masterScan = true; bool masterScan = true;
STimeWindow window = {0}; int32_t numOfOutput = pOperator->numOfOutput;
int32_t numOfOutput = pOperator->numOfOutput; int64_t gid = pBlock->info.groupId;
int64_t gid = pBlock->info.groupId;
int64_t gap = pInfo->gap; int64_t gap = pInfo->gap;
pInfo->numOfRows = 0; pInfo->numOfRows = 0;
if (/*IS_REPEAT_SCAN(pRuntimeEnv) && */ !pInfo->reptScan) { if (!pInfo->reptScan) {
pInfo->reptScan = true; pInfo->reptScan = true;
pInfo->prevTs = INT64_MIN; pInfo->prevTs = INT64_MIN;
} }
// In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
TSKEY* tsList = (TSKEY*)pColInfoData->pData; TSKEY* tsList = (TSKEY*)pColInfoData->pData;
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (pInfo->prevTs == INT64_MIN) { if (pInfo->prevTs == INT64_MIN) {
pInfo->curWindow.skey = tsList[j]; doKeepSessionStartInfo(pInfo, tsList, j);
pInfo->curWindow.ekey = tsList[j]; doKeepTuple(pInfo, tsList[j]);
pInfo->prevTs = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
} else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) { } else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) {
pInfo->curWindow.ekey = tsList[j]; // The gap is less than the threshold, so it belongs to current session window that has been opened already.
pInfo->prevTs = tsList[j]; doKeepTuple(pInfo, tsList[j]);
pInfo->numOfRows += 1;
if (j == 0 && pInfo->start != 0) { if (j == 0 && pInfo->start != 0) {
pInfo->numOfRows = 1;
pInfo->start = 0; pInfo->start = 0;
} }
} else { // start a new session window } else { // start a new session window
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
// keep the time window for the closed time window.
STimeWindow window = pInfo->curWindow;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan,
&pResult, gid, pInfo->binfo.pCtx, numOfOutput, &pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
// pInfo->numOfRows data belong to the current session window // pInfo->numOfRows data belong to the current session window
doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); updateTimeWindowInfo(&pInfo->timeWindowData, &window, false);
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
pInfo->curWindow.skey = tsList[j]; // here we start a new session window
pInfo->curWindow.ekey = tsList[j]; doKeepSessionStartInfo(pInfo, tsList, j);
pInfo->prevTs = tsList[j]; doKeepTuple(pInfo, tsList[j]);
pInfo->numOfRows = 1;
pInfo->start = j;
} }
} }
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = tsList[pBlock->info.rows - 1];
pInfo->curWindow.ekey = pInfo->curWindow.skey; int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pInfo->curWindow, masterScan, &pResult,
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset,
&pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); updateTimeWindowInfo(&pInfo->timeWindowData, &pInfo->curWindow, false);
doApplyFunctions(pInfo->binfo.pCtx, &pInfo->curWindow, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
...@@ -7219,8 +7226,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) ...@@ -7219,8 +7226,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
...@@ -7248,14 +7254,11 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) ...@@ -7248,14 +7254,11 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
pBInfo->rowCellInfoOffset);
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -7863,28 +7866,28 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -7863,28 +7866,28 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
} }
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
int32_t code = int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
initExecTimeWindowInfo(&pInfo->timeWindowData, &pTaskInfo->window);
pInfo->gap = gap; pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->prevTs = INT64_MIN; pInfo->prevTs = INT64_MIN;
pInfo->reptScan = false; pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->getNextFn = doSessionWindowAgg; pOperator->getNextFn = doSessionWindowAgg;
pOperator->closeFn = destroySWindowOperatorInfo; pOperator->closeFn = destroySWindowOperatorInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册