提交 bece2cbd 编写于 作者: H Haojun Liao

<feat>[query]: add the state window operator implementation.

上级 1d51e42b
...@@ -550,21 +550,21 @@ typedef struct SFillOperatorInfo { ...@@ -550,21 +550,21 @@ typedef struct SFillOperatorInfo {
int32_t capacity; int32_t capacity;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupKeys { typedef struct {
char *pData; char *pData;
bool isNull; bool isNull;
int16_t type; int16_t type;
int32_t bytes; int32_t bytes;
}SGroupKeys; } SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition; SNode* pCondition;
bool isInit; // denote if current val is initialized or not bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SAggSupporter aggSup; SAggSupporter aggSup;
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
...@@ -592,27 +592,33 @@ typedef struct SPartitionOperatorInfo { ...@@ -592,27 +592,33 @@ typedef struct SPartitionOperatorInfo {
int32_t pageIndex; // page index of current group int32_t pageIndex; // page index of current group
} SPartitionOperatorInfo; } SPartitionOperatorInfo;
typedef struct SWindowRowsSup {
STimeWindow win;
TSKEY prevTs;
int32_t startRowIndex;
int32_t numOfRows;
} SWindowRowsSup;
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
STimeWindow curWindow; // current time window SWindowRowsSup winSup;
TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows
int32_t start; // start row index
bool reptScan; // next round scan bool reptScan; // next round scan
int64_t gap; // session window gap int64_t gap; // session window gap
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo { typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window SAggSupporter aggSup;
int32_t numOfRows; // number of rows SGroupResInfo groupResInfo;
int32_t colIndex; // start row index SWindowRowsSup winSup;
int32_t start; int32_t colIndex; // start row index
char* prevData; // previous data bool hasKey;
bool reptScan; SStateKeys stateKey;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
// bool reptScan;
} SStateWindowOperatorInfo; } SStateWindowOperatorInfo;
typedef struct SSortedMergeOperatorInfo { typedef struct SSortedMergeOperatorInfo {
......
...@@ -1637,22 +1637,23 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1637,22 +1637,23 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); // updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
} }
static void doKeepTuple(SSessionAggOperatorInfo* pInfo, int64_t ts) { static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) {
pInfo->curWindow.ekey = ts; pRowSup->win.ekey = ts;
pInfo->prevTs = ts; pRowSup->prevTs = ts;
pInfo->numOfRows += 1; pRowSup->numOfRows += 1;
} }
static void doKeepSessionStartInfo(SSessionAggOperatorInfo* pInfo, const int64_t* tsList, int32_t rowIndex) { static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex) {
pInfo->start = rowIndex; pRowSup->startRowIndex = rowIndex;
pInfo->numOfRows = 0; pRowSup->numOfRows = 0;
pInfo->curWindow.skey = tsList[rowIndex]; pRowSup->win.skey = tsList[rowIndex];
} }
// todo handle multiple tables cases. // todo handle multiple tables cases.
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) { static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// todo find the correct time stamp column slot
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
bool masterScan = true; bool masterScan = true;
...@@ -1660,31 +1661,34 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1660,31 +1661,34 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
int64_t gid = pBlock->info.groupId; int64_t gid = pBlock->info.groupId;
int64_t gap = pInfo->gap; int64_t gap = pInfo->gap;
pInfo->numOfRows = 0;
if (!pInfo->reptScan) { if (!pInfo->reptScan) {
pInfo->reptScan = true; pInfo->reptScan = true;
pInfo->prevTs = INT64_MIN; pInfo->winSup.prevTs = INT64_MIN;
} }
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
// In case of ascending or descending order scan data, only one time window needs to be kepted for each table. // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
TSKEY* tsList = (TSKEY*)pColInfoData->pData; TSKEY* tsList = (TSKEY*)pColInfoData->pData;
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (pInfo->prevTs == INT64_MIN) { if (pInfo->winSup.prevTs == INT64_MIN) {
doKeepSessionStartInfo(pInfo, tsList, j); doKeepNewWindowStartInfo(pRowSup, tsList, j);
doKeepTuple(pInfo, tsList[j]); doKeepTuple(pRowSup, tsList[j]);
} else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) { } else if (tsList[j] - pRowSup->prevTs <= gap && (tsList[j] - pRowSup->prevTs) >= 0) {
// The gap is less than the threshold, so it belongs to current session window that has been opened already. // The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pInfo, tsList[j]); doKeepTuple(pRowSup, tsList[j]);
if (j == 0 && pInfo->start != 0) { if (j == 0 && pRowSup->startRowIndex != 0) {
pInfo->start = 0; pRowSup->startRowIndex = 0;
} }
} else { // start a new session window } else { // start a new session window
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
// keep the time window for the closed time window. // keep the time window for the closed time window.
STimeWindow window = pInfo->curWindow; STimeWindow window = pRowSup->win;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan,
&pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); &pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
...@@ -1693,24 +1697,24 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1693,24 +1697,24 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// pInfo->numOfRows data belong to the current session window // pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); updateTimeWindowInfo(&pInfo->timeWindowData, &window, false);
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// here we start a new session window // here we start a new session window
doKeepSessionStartInfo(pInfo, tsList, j); doKeepNewWindowStartInfo(pRowSup, tsList, j);
doKeepTuple(pInfo, tsList[j]); doKeepTuple(pRowSup, tsList[j]);
} }
} }
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = tsList[pBlock->info.rows - 1]; pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pInfo->curWindow, masterScan, &pResult, int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pRowSup->win, masterScan, &pResult,
gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
updateTimeWindowInfo(&pInfo->timeWindowData, &pInfo->curWindow, false); updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false);
doApplyFunctions(pInfo->binfo.pCtx, &pInfo->curWindow, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
...@@ -5641,84 +5645,78 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr ...@@ -5641,84 +5645,78 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
// SQInfo* pQInfo = pRuntimeEnv->qinfo;
// pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
return pIntervalInfo->binfo.pRes; return pIntervalInfo->binfo.pRes;
} }
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pSDataBlock) { static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
int16_t bytes = pColInfoData->info.bytes; int64_t gid = pBlock->info.groupId;
int16_t type = pColInfoData->info.type;
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); bool masterScan = true;
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData; int32_t numOfOutput = pOperator->numOfOutput;
if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) {
pInfo->reptScan = true; int16_t bytes = pStateColInfoData->info.bytes;
taosMemoryFreeClear(pInfo->prevData); int16_t type = pStateColInfoData->info.type;
}
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
pInfo->numOfRows = 0; SWindowRowsSup* pRowSup = &pInfo->winSup;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { pRowSup->numOfRows = 0;
char* val = ((char*)pColInfoData->pData) + bytes * j;
if (isNull(val, type)) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pBlock->pBlockAgg)) {
continue; continue;
} }
if (pInfo->prevData == NULL) {
pInfo->prevData = taosMemoryMalloc(bytes); char* val = colDataGetData(pStateColInfoData, j);
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1; if (!pInfo->hasKey) {
pInfo->curWindow.skey = tsList[j]; memcpy(pInfo->stateKey.pData, val, bytes);
pInfo->curWindow.ekey = tsList[j]; pInfo->hasKey = true;
pInfo->start = j;
doKeepNewWindowStartInfo(pRowSup, tsList, j);
} else if (memcmp(pInfo->prevData, val, bytes) == 0) { doKeepTuple(pRowSup, tsList[j]);
pInfo->curWindow.ekey = tsList[j]; } else if (memcmp(pInfo->stateKey.pData, val, bytes) == 0) {
pInfo->numOfRows += 1; doKeepTuple(pRowSup, tsList[j]);
// pInfo->start = j; if (j == 0 && pRowSup->startRowIndex != 0) {
if (j == 0 && pInfo->start != 0) { pRowSup->startRowIndex = 0;
pInfo->numOfRows = 1;
pInfo->start = 0;
} }
} else { } else { // a new state window started
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, // keep the time window for the closed time window.
&pInfo->curWindow, masterScan, &pResult, item->groupIndex, pBInfo->pCtx, STimeWindow window = pRowSup->win;
pOperator->numOfOutput, pBInfo->rowCellInfoOffset);
pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan,
&pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
// pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j]; updateTimeWindowInfo(&pInfo->timeWindowData, &window, false);
pInfo->curWindow.ekey = tsList[j]; doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1; // here we start a new session window
pInfo->start = j; doKeepNewWindowStartInfo(pRowSup, tsList, j);
doKeepTuple(pRowSup, tsList[j]);
} }
} }
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
pInfo->curWindow.ekey = pInfo->curWindow.skey; int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pRowSup->win, masterScan, &pResult,
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
masterScan, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} }
// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false);
// pSDataBlock->info.rows, pOperator->numOfOutput); doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
...@@ -5726,16 +5724,16 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -5726,16 +5724,16 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
return NULL; return NULL;
} }
SStateWindowOperatorInfo* pWindowInfo = pOperator->info; SStateWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { doSetOperatorCompleted(pOperator);
// pOperator->status = OP_EXEC_DONE; return NULL;
// } }
return pBInfo->pRes; return pBInfo->pRes;
} }
...@@ -5753,28 +5751,20 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -5753,28 +5751,20 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
break; break;
} }
// setInputDataBlock(pOperator, pBInfo->pCtx, pDataBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order);
// if (pWindowInfo->colIndex == -1) { doStateWindowAggImpl(pOperator, pInfo, pBlock);
// pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
// }
doStateWindowAggImpl(pOperator, pWindowInfo, pBlock);
} }
// restore the value
// pQueryAttr->order.order = order;
// pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput);
// initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
// pOperator->status = OP_EXEC_DONE; blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
// } toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
...@@ -6098,7 +6088,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6098,7 +6088,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosMemoryFreeClear(pInfo->prevData); taosMemoryFreeClear(pInfo->stateKey.pData);
} }
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -6332,16 +6322,18 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S ...@@ -6332,16 +6322,18 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->colIndex = -1; pInfo->colIndex = -1;
pInfo->reptScan = false;
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, pTaskInfo->id.str);
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); pOperator->name = "StateWindowOperator";
pOperator->name = "StateWindowOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
// pOperator->operatorType = OP_StateWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
...@@ -6354,6 +6346,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -6354,6 +6346,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_SUCCESS;
return NULL;
} }
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
...@@ -6375,7 +6371,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -6375,7 +6371,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pInfo->gap = gap; pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->prevTs = INT64_MIN; pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false; pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
...@@ -6946,9 +6942,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6946,9 +6942,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
int32_t num = 0;
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
...@@ -6957,8 +6953,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6957,8 +6953,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
int32_t num = 0;
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
...@@ -6972,7 +6966,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6972,7 +6966,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
...@@ -6997,7 +6990,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6997,7 +6990,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
...@@ -7006,10 +6998,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -7006,10 +6998,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
} else if (QUERY_NODE_STATE_WINDOW == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createStatewindowOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
} else { } else {
ASSERT(0); ASSERT(0);
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
......
...@@ -48,7 +48,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** ...@@ -48,7 +48,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
SColumn* pCol = taosArrayGet(pGroupColList, i); SColumn* pCol = taosArrayGet(pGroupColList, i);
(*keyLen) += pCol->bytes; (*keyLen) += pCol->bytes;
struct SGroupKeys key = {0}; SGroupKeys key = {0};
key.bytes = pCol->bytes; key.bytes = pCol->bytes;
key.type = pCol->type; key.type = pCol->type;
key.isNull = false; key.isNull = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册