timewindowoperator.c 188.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
#include "executorimpl.h"
16
#include "filter.h"
X
Xiaoyu Wang 已提交
17
#include "function.h"
5
54liuyao 已提交
18
#include "functionMgt.h"
H
Haojun Liao 已提交
19
#include "tcommon.h"
5
54liuyao 已提交
20
#include "tcompare.h"
L
Liu Jicong 已提交
21
#include "tdatablock.h"
H
Haojun Liao 已提交
22
#include "tfill.h"
23
#include "ttime.h"
24

L
Liu Jicong 已提交
25
#define IS_FINAL_OP(op)    ((op)->isFinal)
5
54liuyao 已提交
26
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
H
Haojun Liao 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51

typedef struct SSessionAggOperatorInfo {
  SOptrBasicInfo     binfo;
  SAggSupporter      aggSup;
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
  bool               reptScan;  // next round scan
  int64_t            gap;       // session window gap
  int32_t            tsSlotId;  // primary timestamp slot id
  STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;

typedef struct SStateWindowOperatorInfo {
  SOptrBasicInfo     binfo;
  SAggSupporter      aggSup;
  SExprSupp          scalarSup;
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
  SColumn            stateCol;  // start row index
  bool               hasKey;
  SStateKeys         stateKey;
  int32_t            tsSlotId;  // primary timestamp column slot id
  STimeWindowAggSupp twAggSup;
} SStateWindowOperatorInfo;

52 53 54 55 56
typedef enum SResultTsInterpType {
  RESULT_ROW_START_INTERP = 1,
  RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;

5
54liuyao 已提交
57 58
typedef struct SPullWindowInfo {
  STimeWindow window;
L
Liu Jicong 已提交
59
  uint64_t    groupId;
5
54liuyao 已提交
60
  STimeWindow calWin;
5
54liuyao 已提交
61 62
} SPullWindowInfo;

63 64
typedef struct SOpenWindowInfo {
  SResultRowPosition pos;
L
Liu Jicong 已提交
65
  uint64_t           groupId;
66 67
} SOpenWindowInfo;

68 69
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);

L
Liu Jicong 已提交
70 71
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
                                              uint64_t groupId);
72 73
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);

X
Xiaoyu Wang 已提交
74
static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
75 76 77

static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
78
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
79 80 81 82 83 84 85 86 87 88 89
                                      SExecTaskInfo* pTaskInfo) {
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup);

  if (pResultRow == NULL) {
    *pResult = NULL;
    return TSDB_CODE_SUCCESS;
  }

  // set time window for current result
  pResultRow->win = (*win);
90

91
  *pResult = pResultRow;
92
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
93

94 95 96 97 98 99 100 101 102 103 104 105 106
  return TSDB_CODE_SUCCESS;
}

static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
  int64_t* ts = (int64_t*)pColData->pData;
  int32_t  delta = includeEndpoint ? 1 : 0;

  int64_t duration = pWin->ekey - pWin->skey + delta;
  ts[2] = duration;            // set the duration
  ts[3] = pWin->skey;          // window start key
  ts[4] = pWin->ekey + delta;  // window end key
}

107
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
108 109 110
  pRowSup->win.ekey = ts;
  pRowSup->prevTs = ts;
  pRowSup->numOfRows += 1;
111
  pRowSup->groupId = groupId;
112 113
}

dengyihao's avatar
dengyihao 已提交
114 115
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
                                     uint64_t groupId) {
116 117 118
  pRowSup->startRowIndex = rowIndex;
  pRowSup->numOfRows = 0;
  pRowSup->win.skey = tsList[rowIndex];
119
  pRowSup->groupId = groupId;
120 121
}

5
54liuyao 已提交
122 123
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey,
                                                   int32_t pos, int32_t order, int64_t* pData) {
124
  int32_t forwardRows = 0;
125 126 127 128

  if (order == TSDB_ORDER_ASC) {
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
    if (end >= 0) {
129
      forwardRows = end;
130 131

      if (pData[end + pos] == ekey) {
132
        forwardRows += 1;
133 134 135
      }
    }
  } else {
136
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
137
    if (end >= 0) {
138
      forwardRows = end;
139

140
      if (pData[end + pos] == ekey) {
141
        forwardRows += 1;
142 143
      }
    }
X
Xiaoyu Wang 已提交
144 145 146 147 148 149 150 151
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
    //    if (end >= 0) {
    //      forwardRows = pos - end;
    //
    //      if (pData[end] == ekey) {
    //        forwardRows += 1;
    //      }
    //    }
152 153
  }

154 155
  assert(forwardRows >= 0);
  return forwardRows;
156 157
}

5
54liuyao 已提交
158
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
  int32_t midPos = -1;
  int32_t numOfRows;

  if (num <= 0) {
    return -1;
  }

  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
175 176 177 178 179 180 181 182 183 184 185
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }
186 187 188 189 190 191

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
192 193
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

X
Xiaoyu Wang 已提交
229 230
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
  assert(startPos >= 0 && startPos < pDataBlockInfo->rows);

  int32_t num = -1;
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);

  if (order == TSDB_ORDER_ASC) {
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
      if (item != NULL) {
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
      }
    } else {
      num = pDataBlockInfo->rows - startPos;
      if (item != NULL) {
        item->lastKey = pDataBlockInfo->window.ekey + step;
      }
    }
  } else {  // desc
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
      if (item != NULL) {
252
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
253 254
      }
    } else {
255
      num = pDataBlockInfo->rows - startPos;
256
      if (item != NULL) {
257
        item->lastKey = pDataBlockInfo->window.ekey + step;
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
      }
    }
  }

  assert(num >= 0);
  return num;
}

static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t order, STimeWindow* tw) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
  // convert key to second
  key = convertTimePrecision(key, precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
  time_t    t = (time_t)key;
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
289
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
290 291 292 293

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
294
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
295 296 297 298

  tw->ekey -= 1;
}

5
54liuyao 已提交
299 300 301 302
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  getNextTimeWindow(pInterval, pInterval->precision, order, tw);
}

303 304
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
305
  SqlFunctionCtx* pCtx = pSup->pCtx;
306

307
  int32_t index = 1;
308
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
H
Haojun Liao 已提交
309
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
310 311 312 313
      pCtx[k].start.key = INT64_MIN;
      continue;
    }

X
Xiaoyu Wang 已提交
314
    SFunctParam*     pParam = &pCtx[k].param[0];
315 316
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);

317
    ASSERT(pColInfo->info.type == pParam->pCol->type && curTs != windowKey);
318

319
    double v1 = 0, v2 = 0, v = 0;
320
    if (prevRowIndex == -1) {
321
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
322
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData);
323
    } else {
324
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex));
325 326
    }

327
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
328

329
#if 0
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
    if (functionId == FUNCTION_INTERP) {
      if (type == RESULT_ROW_START_INTERP) {
        pCtx[k].start.key = prevTs;
        pCtx[k].start.val = v1;

        pCtx[k].end.key = curTs;
        pCtx[k].end.val = v2;

        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
          if (prevRowIndex == -1) {
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
          } else {
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
          }

          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
        }
      }
    } else if (functionId == FUNCTION_TWA) {
349 350
#endif

X
Xiaoyu Wang 已提交
351 352 353
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
    SPoint point = (SPoint){.key = windowKey, .val = &v};
354

X
Xiaoyu Wang 已提交
355
    taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
356

X
Xiaoyu Wang 已提交
357 358 359 360 361 362
    if (type == RESULT_ROW_START_INTERP) {
      pCtx[k].start.key = point.key;
      pCtx[k].start.val = v;
    } else {
      pCtx[k].end.key = point.key;
      pCtx[k].end.val = v;
363
    }
X
Xiaoyu Wang 已提交
364 365 366

    index += 1;
  }
367
#if 0
368
  }
369
#endif
370 371 372 373 374 375 376 377 378 379 380 381 382 383
}

static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
  if (type == RESULT_ROW_START_INTERP) {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      pCtx[k].start.key = INT64_MIN;
    }
  } else {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      pCtx[k].end.key = INT64_MIN;
    }
  }
}

384 385
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
386
  bool ascQuery = (pInfo->inputOrder == TSDB_ORDER_ASC);
387

388
  TSKEY curTs = tsCols[pos];
389 390

  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
X
Xiaoyu Wang 已提交
391
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
392 393 394 395 396

  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
  // start exactly from this point, no need to do interpolation
  TSKEY key = ascQuery ? win->skey : win->ekey;
  if (key == curTs) {
397
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
398 399 400
    return true;
  }

401 402
  // it is the first time window, no need to do interpolation
  if (pTsKey->isNull && pos == 0) {
403
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
404 405
  } else {
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
406 407
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
                              RESULT_ROW_START_INTERP, pSup);
408 409 410 411 412
  }

  return true;
}

413 414 415
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
                                            SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
                                            STimeWindow* win) {
416
  int32_t order = pInfo->inputOrder;
417 418

  TSKEY actualEndKey = tsCols[endRowIndex];
419
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
420 421

  // not ended in current data block, do not invoke interpolation
422
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
423
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
424 425 426
    return false;
  }

427
  // there is actual end point of current time window, no interpolation needs
428
  if (key == actualEndKey) {
429
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
430 431 432
    return true;
  }

433
  int32_t nextRowIndex = endRowIndex + 1;
434 435 436
  assert(nextRowIndex >= 0);

  TSKEY nextKey = tsCols[nextRowIndex];
437 438
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
                            RESULT_ROW_END_INTERP, pSup);
439 440 441
  return true;
}

442 443
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
  if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
5
54liuyao 已提交
444 445 446 447 448
    return false;
  }
  return true;
}

449 450 451 452
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
}

453
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
5
54liuyao 已提交
454
                                      TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
X
Xiaoyu Wang 已提交
455
  bool ascQuery = (order == TSDB_ORDER_ASC);
456 457 458 459 460 461 462 463 464 465

  int32_t precision = pInterval->precision;
  getNextTimeWindow(pInterval, precision, order, pNext);

  // next time window is not in current block
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
    return -1;
  }

5
54liuyao 已提交
466 467 468 469
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
    return -1;
  }

470
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
471 472 473 474
  int32_t startPos = 0;

  // tumbling time window query, a special case of sliding time window query
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
475
    startPos = prevPosition + 1;
476
  } else {
477
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
478 479
      startPos = 0;
    } else {
480
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
    }
  }

  /* interp query with fill should not skip time window */
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
  //    return startPos;
  //  }

  /*
   * This time window does not cover any data, try next time window,
   * this case may happen when the time window is too small
   */
  if (primaryKeys == NULL) {
    if (ascQuery) {
      assert(pDataBlockInfo->window.skey <= pNext->ekey);
    } else {
      assert(pDataBlockInfo->window.ekey >= pNext->skey);
    }
  } else {
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
      TSKEY next = primaryKeys[startPos];
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
        pNext->skey = taosTimeTruncate(next, pInterval, precision);
        pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
      } else {
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
        pNext->skey = pNext->ekey - pInterval->interval + 1;
      }
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
      TSKEY next = primaryKeys[startPos];
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
        pNext->skey = taosTimeTruncate(next, pInterval, precision);
        pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
      } else {
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
        pNext->ekey = pNext->skey + pInterval->interval - 1;
      }
    }
  }

  return startPos;
}

524
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
525
  ASSERT(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
  if (type == RESULT_ROW_START_INTERP) {
    return pResult->startInterp == true;
  } else {
    return pResult->endInterp == true;
  }
}

static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
  assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
  if (type == RESULT_ROW_START_INTERP) {
    pResult->startInterp = true;
  } else {
    pResult->endInterp = true;
  }
}

542 543
static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
                                        STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
544
  if (!pInfo->timeWindowInterpo) {
545 546 547
    return;
  }

548
  ASSERT(pBlock != NULL);
549 550 551 552 553
  if (pBlock->pDataBlock == NULL) {
    //    tscError("pBlock->pDataBlock == NULL");
    return;
  }

554
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
555 556

  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
557
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
558
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
559
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
560 561 562 563
    if (interp) {
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
    }
  } else {
564
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
565 566 567 568 569 570 571 572
  }

  // point interpolation does not require the end key time window interpolation.
  //  if (pointInterpQuery) {
  //    return;
  //  }

  // interpolation query does not generate the time window end interpolation
573
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
574
  if (!done) {
575
    int32_t endRowIndex = startPos + forwardRows - 1;
576

577
    TSKEY endKey = (pInfo->inputOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
578
    bool  interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
579 580 581 582
    if (interp) {
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
    }
  } else {
583
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
584 585 586
  }
}

587 588
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
  if (pBlock->pDataBlock == NULL) {
589 590 591
    return;
  }

592 593 594 595 596 597 598
  size_t num = taosArrayGetSize(pPrevKeys);
  for (int32_t k = 0; k < num; ++k) {
    SColumn* pc = taosArrayGet(pCols, k);

    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);

    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
X
Xiaoyu Wang 已提交
599
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
600 601 602 603 604 605 606
      if (colDataIsNull_s(pColInfo, i)) {
        continue;
      }

      char* val = colDataGetData(pColInfo, i);
      if (IS_VAR_DATA_TYPE(pkey->type)) {
        memcpy(pkey->pData, val, varDataTLen(val));
607
        ASSERT(varDataTLen(val) <= pkey->bytes);
608 609 610 611 612 613
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }

      break;
    }
614 615 616
  }
}

617 618 619 620
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;

621
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
622
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
623

L
Liu Jicong 已提交
624 625
  int32_t startPos = 0;
  int32_t numOfOutput = pSup->numOfExprs;
626

627
  SResultRow* pResult = NULL;
628

629
  while (1) {
L
Liu Jicong 已提交
630 631 632
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
    uint64_t            groupId = pOpenWin->groupId;
633
    SResultRowPosition* p1 = &pOpenWin->pos;
634 635 636
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
      break;
    }
637

638
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
639 640 641 642
    if (NULL == pr) {
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
    
643
    ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
644

645
    if (pr->closed) {
646
      ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
X
Xiaoyu Wang 已提交
647
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP));
648 649
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
      taosMemoryFree(pNode);
650 651
      continue;
    }
652

653
    STimeWindow w = pr->win;
654 655
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
656
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
657
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
658 659
    }

660
    ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
661

X
Xiaoyu Wang 已提交
662 663
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
H
Haojun Liao 已提交
664
    if (groupId == pBlock->info.id.groupId) {
665 666 667
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
                                RESULT_ROW_END_INTERP, pSup);
    }
668 669

    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
670
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
671

672
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
L
Liu Jicong 已提交
673 674
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
                                    pBlock->info.rows, numOfExprs);
675 676 677

    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
      closeResultRow(pr);
678 679
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
      taosMemoryFree(pNode);
X
Xiaoyu Wang 已提交
680
    } else {  // the remains are can not be closed yet.
681
      break;
682
    }
683
  }
684
}
685

5
54liuyao 已提交
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);

int32_t binarySearchCom(void* keyList, int num, void* pKey, int order, __compare_fn_t comparefn) {
  int firstPos = 0, lastPos = num - 1, midPos = -1;
  int numOfRows = 0;

  if (num <= 0) return -1;
  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller or equal than the key
    while (1) {
      if (comparefn(pKey, keyList, lastPos) >= 0) return lastPos;
      if (comparefn(pKey, keyList, firstPos) == 0) return firstPos;
      if (comparefn(pKey, keyList, firstPos) < 0) return firstPos - 1;

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (comparefn(pKey, keyList, midPos) < 0) {
        lastPos = midPos - 1;
      } else if (comparefn(pKey, keyList, midPos) > 0) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger or equal than the key
    while (1) {
      if (comparefn(pKey, keyList, firstPos) <= 0) return firstPos;
      if (comparefn(pKey, keyList, lastPos) == 0) return lastPos;

      if (comparefn(pKey, keyList, lastPos) > 0) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (comparefn(pKey, keyList, midPos) < 0) {
        lastPos = midPos - 1;
      } else if (comparefn(pKey, keyList, midPos) > 0) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

5
54liuyao 已提交
742
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
743

X
Xiaoyu Wang 已提交
744 745 746
int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_fn_t getValuefn) {
  int firstPos = 0, lastPos = num - 1, midPos = -1;
  int numOfRows = 0;
5
54liuyao 已提交
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792

  if (num <= 0) return -1;
  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller or equal than the key
    while (1) {
      if (key >= getValuefn(keyList, lastPos)) return lastPos;
      if (key == getValuefn(keyList, firstPos)) return firstPos;
      if (key < getValuefn(keyList, firstPos)) return firstPos - 1;

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < getValuefn(keyList, midPos)) {
        lastPos = midPos - 1;
      } else if (key > getValuefn(keyList, midPos)) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger or equal than the key
    while (1) {
      if (key <= getValuefn(keyList, firstPos)) return firstPos;
      if (key == getValuefn(keyList, lastPos)) return lastPos;

      if (key > getValuefn(keyList, lastPos)) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < getValuefn(keyList, midPos)) {
        lastPos = midPos - 1;
      } else if (key > getValuefn(keyList, midPos)) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
793 794
  }

5
54liuyao 已提交
795 796 797
  return midPos;
}

5
54liuyao 已提交
798
int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
L
Liu Jicong 已提交
799
  SArray*          res = (SArray*)data;
5
54liuyao 已提交
800
  SPullWindowInfo* pos = taosArrayGet(res, index);
L
Liu Jicong 已提交
801
  SPullWindowInfo* pData = (SPullWindowInfo*)pKey;
5
54liuyao 已提交
802
  if (pData->groupId > pos->groupId) {
5
54liuyao 已提交
803
    return 1;
5
54liuyao 已提交
804 805 806 807 808 809 810 811
  } else if (pData->groupId < pos->groupId) {
    return -1;
  }

  if (pData->window.skey > pos->window.ekey) {
    return 1;
  } else if (pData->window.ekey < pos->window.skey) {
    return -1;
5
54liuyao 已提交
812
  }
5
54liuyao 已提交
813
  return 0;
5
54liuyao 已提交
814 815 816 817 818 819 820 821
}

static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
  int32_t size = taosArrayGetSize(pPullWins);
  int32_t index = binarySearchCom(pPullWins, size, pPullInfo, TSDB_ORDER_DESC, comparePullWinKey);
  if (index == -1) {
    index = 0;
  } else {
5
54liuyao 已提交
822 823
    int32_t code = comparePullWinKey(pPullInfo, pPullWins, index);
    if (code == 0) {
L
Liu Jicong 已提交
824
      SPullWindowInfo* pos = taosArrayGet(pPullWins, index);
5
54liuyao 已提交
825 826 827 828
      pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey);
      pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey);
      pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey);
      pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey);
5
54liuyao 已提交
829
      return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
830
    } else if (code > 0) {
5
54liuyao 已提交
831
      index++;
5
54liuyao 已提交
832 833 834 835 836 837 838 839
    }
  }
  if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
840 841 842
static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
  winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
  return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
5
54liuyao 已提交
843 844
}

5
54liuyao 已提交
845 846 847 848 849 850 851 852
static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) {
  SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
  if (newPos == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  newPos->groupId = groupId;
  newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
  *(int64_t*)newPos->key = ts;
H
Haojun Liao 已提交
853 854
  SWinKey key = {.ts = ts, .groupId = groupId};
  if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
5
54liuyao 已提交
855 856 857
    taosMemoryFree(newPos);
  }
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
858 859
}

5
54liuyao 已提交
860 861 862 863
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) {
  return saveWinResult(ts, -1, -1, groupId, pUpdatedMap);
}

5
54liuyao 已提交
864
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
5
54liuyao 已提交
865 866
  int32_t size = taosArrayGetSize(pWins);
  for (int32_t i = 0; i < size; i++) {
H
Haojun Liao 已提交
867
    SWinKey* pW = taosArrayGet(pWins, i);
5
54liuyao 已提交
868 869 870 871 872 873
    void*    tmp = taosHashGet(pUpdatedMap, pW, sizeof(SWinKey));
    if (tmp) {
      void* value = *(void**)tmp;
      taosMemoryFree(value);
      taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
    }
5
54liuyao 已提交
874 875 876
  }
}

5
54liuyao 已提交
877 878
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
  SArray*     res = (SArray*)data;
5
54liuyao 已提交
879 880 881
  SWinKey*    pDataPos = taosArrayGet(res, index);
  SResKeyPos* pRKey = (SResKeyPos*)pKey;
  if (pRKey->groupId > pDataPos->groupId) {
5
54liuyao 已提交
882
    return 1;
5
54liuyao 已提交
883 884
  } else if (pRKey->groupId < pDataPos->groupId) {
    return -1;
5
54liuyao 已提交
885
  }
L
Liu Jicong 已提交
886

5
54liuyao 已提交
887 888
  if (*(int64_t*)pRKey->key > pDataPos->ts) {
    return 1;
L
Liu Jicong 已提交
889
  } else if (*(int64_t*)pRKey->key < pDataPos->ts) {
5
54liuyao 已提交
890 891 892
    return -1;
  }
  return 0;
5
54liuyao 已提交
893 894
}

5
54liuyao 已提交
895
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
5
54liuyao 已提交
896 897
  taosArraySort(pDelWins, winKeyCmprImpl);
  taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
L
Liu Jicong 已提交
898 899
  int32_t delSize = taosArrayGetSize(pDelWins);
  if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
5
54liuyao 已提交
900
    return;
dengyihao's avatar
dengyihao 已提交
901
  }
L
Liu Jicong 已提交
902
  void* pIte = NULL;
5
54liuyao 已提交
903
  while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
904
    SResKeyPos* pResKey = *(SResKeyPos**)pIte;
5
54liuyao 已提交
905 906
    int32_t     index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes);
    if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) {
907
      taosArrayRemove(pDelWins, index);
908
      delSize = taosArrayGetSize(pDelWins);
909 910 911 912
    }
  }
}

5
54liuyao 已提交
913
bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
5
54liuyao 已提交
914
  ASSERTS(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0, "maxts should greater than 0");
5
54liuyao 已提交
915
  return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
5
54liuyao 已提交
916 917
}

5
54liuyao 已提交
918 919 920 921 922
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { return isOverdue(pWin->ekey, pTwSup); }

bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
  return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark;
}
5
54liuyao 已提交
923

5
54liuyao 已提交
924
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
925
                            int32_t scanFlag) {
926
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
927

928
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
929
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
930

X
Xiaoyu Wang 已提交
931
  int32_t     startPos = 0;
932
  int32_t     numOfOutput = pSup->numOfExprs;
X
Xiaoyu Wang 已提交
933
  int64_t*    tsCols = extractTsCol(pBlock, pInfo);
H
Haojun Liao 已提交
934
  uint64_t    tableGroupId = pBlock->info.id.groupId;
935
  bool        ascScan = (pInfo->inputOrder == TSDB_ORDER_ASC);
X
Xiaoyu Wang 已提交
936 937
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
  SResultRow* pResult = NULL;
938

939 940
  STimeWindow win =
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
941 942
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
943
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
944
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
945
  }
X
Xiaoyu Wang 已提交
946 947
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
  int32_t forwardRows =
948
      getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
949
  ASSERT(forwardRows > 0);
950 951

  // prev time window not interpolation yet.
952
  if (pInfo->timeWindowInterpo) {
953
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
954
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
955 956

    // restore current time window
957 958
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
959
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
960
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
961 962
    }

963
    // window start key interpolation
964
    doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
965
  }
966

967
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
L
Liu Jicong 已提交
968 969
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
                                  pBlock->info.rows, numOfOutput);
970 971

  doCloseWindow(pResultRowInfo, pInfo, pResult);
972 973 974

  STimeWindow nextWin = win;
  while (1) {
975
    int32_t prevEndPos = forwardRows - 1 + startPos;
976
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->inputOrder);
977 978 979 980
    if (startPos < 0) {
      break;
    }
    // null data, failed to allocate more memory buffer
X
Xiaoyu Wang 已提交
981
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
982
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
983
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
984
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
985 986
    }

X
Xiaoyu Wang 已提交
987
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
988
    forwardRows =
989
        getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
990
    // window start(end) key interpolation
991
    doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
L
Liu Jicong 已提交
992
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
S
shenglian zhou 已提交
993
#if 0
994 995 996 997
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
        (!ascScan && ekey >= pBlock->info.window.skey)) {
      // window start(end) key interpolation
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
998
    } else if (pInfo->timeWindowInterpo) {
999 1000
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
    }
S
shenglian zhou 已提交
1001
#endif
1002
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
L
Liu Jicong 已提交
1003 1004
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
                                    pBlock->info.rows, numOfOutput);
1005
    doCloseWindow(pResultRowInfo, pInfo, pResult);
1006 1007 1008
  }

  if (pInfo->timeWindowInterpo) {
1009
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
1010
  }
1011 1012 1013 1014 1015 1016
}

void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
  // current result is done in computing final results.
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
    closeResultRow(pResult);
1017
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
D
dapan1121 已提交
1018
    taosMemoryFree(pNode);
1019 1020 1021
  }
}

1022 1023 1024 1025 1026
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) {
  SOpenWindowInfo openWin = {0};
  openWin.pos.pageId = pResult->pageId;
  openWin.pos.offset = pResult->offset;
  openWin.groupId = groupId;
L
Liu Jicong 已提交
1027
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
1028
  if (pn == NULL) {
1029 1030
    tdListAppend(pResultRowInfo->openWindow, &openWin);
    return openWin.pos;
1031 1032
  }

L
Liu Jicong 已提交
1033
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
1034 1035
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
    tdListAppend(pResultRowInfo->openWindow, &openWin);
1036 1037
  }

1038
  return openWin.pos;
1039 1040 1041 1042
}

int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
  TSKEY* tsCols = NULL;
1043

D
dapan1121 已提交
1044
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
1045 1046
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
    tsCols = (int64_t*)pColDataInfo->pData;
H
Haojun Liao 已提交
1047
    ASSERT(tsCols[0] != 0);
1048

1049 1050 1051 1052 1053 1054
    // no data in primary ts
    if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) {
      return NULL;
    }

    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
1055 1056 1057 1058 1059
      blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
    }
  }

  return tsCols;
1060 1061 1062 1063 1064 1065 1066
}

static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
  }

1067 1068 1069
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SOperatorInfo* downstream = pOperator->pDownstream[0];

1070
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1071
  SExprSupp*                pSup = &pOperator->exprSupp;
1072

1073
  int32_t scanFlag = MAIN_SCAN;
1074
  int64_t st = taosGetTimestampUs();
1075 1076

  while (1) {
1077
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1078 1079 1080 1081
    if (pBlock == NULL) {
      break;
    }

1082
    getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag);
1083

1084
    if (pInfo->scalarSupp.pExprInfo != NULL) {
L
Liu Jicong 已提交
1085 1086
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
1087 1088
    }

1089
    // the pDataBlock are always the same one, no need to call this again
1090
    setInputDataBlock(pSup, pBlock, pInfo->inputOrder, scanFlag, true);
1091
    hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag);
1092 1093
  }

1094
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->resultTsOrder);
1095
  OPTR_SET_OPENED(pOperator);
1096 1097

  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1098 1099 1100
  return TSDB_CODE_SUCCESS;
}

1101 1102 1103 1104 1105
static bool compareVal(const char* v, const SStateKeys* pKey) {
  if (IS_VAR_DATA_TYPE(pKey->type)) {
    if (varDataLen(v) != varDataLen(pKey->pData)) {
      return false;
    } else {
D
dapan1121 已提交
1106
      return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
1107 1108 1109 1110 1111 1112
    }
  } else {
    return memcmp(pKey->pData, v, pKey->bytes) == 0;
  }
}

1113
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
L
Liu Jicong 已提交
1114
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1115
  SExprSupp*     pSup = &pOperator->exprSupp;
1116

1117
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
H
Haojun Liao 已提交
1118
  int64_t          gid = pBlock->info.id.groupId;
1119 1120

  bool    masterScan = true;
1121
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
1122 1123
  int16_t bytes = pStateColInfoData->info.bytes;

1124
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
1125 1126 1127 1128 1129
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;

  SWindowRowsSup* pRowSup = &pInfo->winSup;
  pRowSup->numOfRows = 0;

1130
  struct SColumnDataAgg* pAgg = NULL;
1131
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
X
Xiaoyu Wang 已提交
1132
    pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
1133
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
1134 1135 1136 1137 1138
      continue;
    }

    char* val = colDataGetData(pStateColInfoData, j);

1139
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
1140 1141 1142 1143 1144 1145 1146
      // todo extract method
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
        varDataCopy(pInfo->stateKey.pData, val);
      } else {
        memcpy(pInfo->stateKey.pData, val, bytes);
      }

1147 1148
      pInfo->hasKey = true;

1149 1150
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1151
    } else if (compareVal(val, &pInfo->stateKey)) {
1152
      doKeepTuple(pRowSup, tsList[j], gid);
1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
      if (j == 0 && pRowSup->startRowIndex != 0) {
        pRowSup->startRowIndex = 0;
      }
    } else {  // a new state window started
      SResultRow* pResult = NULL;

      // keep the time window for the closed time window.
      STimeWindow window = pRowSup->win;

      pRowSup->win.ekey = pRowSup->win.skey;
1163 1164
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1165
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1166
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1167 1168 1169
      }

      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
H
Haojun Liao 已提交
1170
      applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
L
Liu Jicong 已提交
1171
                                      pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1172 1173

      // here we start a new session window
1174 1175
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1176 1177 1178 1179 1180 1181 1182

      // todo extract method
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
        varDataCopy(pInfo->stateKey.pData, val);
      } else {
        memcpy(pInfo->stateKey.pData, val, bytes);
      }
1183 1184 1185 1186 1187
    }
  }

  SResultRow* pResult = NULL;
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
1188 1189
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1190
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1191
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1192 1193 1194
  }

  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
L
Liu Jicong 已提交
1195 1196
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
                                  pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1197 1198
}

H
Hongze Cheng 已提交
1199
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
1200 1201
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1202 1203 1204
  }

  SStateWindowOperatorInfo* pInfo = pOperator->info;
1205
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1206

1207 1208 1209
  SExprSupp* pSup = &pOperator->exprSupp;
  int32_t    order = TSDB_ORDER_ASC;
  int64_t    st = taosGetTimestampUs();
1210 1211 1212

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  while (1) {
1213
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1214 1215 1216 1217
    if (pBlock == NULL) {
      break;
    }

1218
    setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
1219 1220
    blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);

1221 1222 1223 1224 1225 1226 1227 1228 1229
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
    if (pInfo->scalarSup.pExprInfo != NULL) {
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
      }
    }

1230 1231 1232
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
  }

X
Xiaoyu Wang 已提交
1233
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1234
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1235 1236
  pOperator->status = OP_RES_TO_RETURN;

1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250
  return TSDB_CODE_SUCCESS;
}

static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SStateWindowOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;

  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1251
    setOperatorCompleted(pOperator);
1252 1253 1254
    return NULL;
  }

1255
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1256
  while (1) {
1257
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1258
    doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1259

1260
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1261
    if (!hasRemain) {
H
Haojun Liao 已提交
1262
      setOperatorCompleted(pOperator);
1263 1264
      break;
    }
1265

1266 1267 1268 1269
    if (pBInfo->pRes->info.rows > 0) {
      break;
    }
  }
1270

1271
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1272
  return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1273 1274
}

1275
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
1276
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1277
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1278 1279 1280 1281 1282 1283

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pBlock = pInfo->binfo.pRes;
1284 1285 1286 1287
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
    return NULL;
  }
1288

1289 1290 1291
  while (1) {
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
1292

1293 1294
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
    if (!hasRemain) {
H
Haojun Liao 已提交
1295
      setOperatorCompleted(pOperator);
1296
      break;
1297 1298
    }

1299 1300 1301
    if (pBlock->info.rows > 0) {
      break;
    }
1302
  }
1303 1304 1305 1306 1307

  size_t rows = pBlock->info.rows;
  pOperator->resultInfo.totalRows += rows;

  return (rows == 0) ? NULL : pBlock;
1308 1309
}

5
54liuyao 已提交
1310
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
L
Liu Jicong 已提交
1311
  for (int i = 0; i < num; i++) {
5
54liuyao 已提交
1312 1313
    if (type == STREAM_INVERT) {
      fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
L
Liu Jicong 已提交
1314
    } else if (type == STREAM_NORMAL) {
5
54liuyao 已提交
1315 1316 1317 1318
      fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
    }
  }
}
5
54liuyao 已提交
1319

5
54liuyao 已提交
1320
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
1321
  SResultRow*     pResult = getResultRowByPos(pResultBuf, p1, false);
1322 1323 1324 1325
  if (NULL == pResult) {
    return;
  }
  
1326
  SqlFunctionCtx* pCtx = pSup->pCtx;
5
54liuyao 已提交
1327
  for (int32_t i = 0; i < numOfOutput; ++i) {
1328
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337
    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }
    pResInfo->initialized = false;
    if (pCtx[i].functionId != -1) {
      pCtx[i].fpSet.init(&pCtx[i], pResInfo);
    }
  }
5
54liuyao 已提交
1338
  SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
1339 1340 1341
  if (NULL == bufPage) {
    return;
  }
5
54liuyao 已提交
1342 1343
  setBufPageDirty(bufPage, true);
  releaseBufPage(pResultBuf, bufPage);
5
54liuyao 已提交
1344 1345
}

1346
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) {
5
54liuyao 已提交
1347 1348 1349
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SWinKey                      key = {.ts = ts, .groupId = groupId};
  tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
1350
  streamStateDel(pInfo->pState, &key);
5
54liuyao 已提交
1351 1352 1353
  return true;
}

1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
                            SHashObj* pUpdatedMap) {
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SColumnInfoData*             pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*                       startTsCols = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData*             pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*                       endTsCols = (TSKEY*)pEndTsCol->pData;
  SColumnInfoData*             pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*                       calStTsCols = (TSKEY*)pCalStTsCol->pData;
  SColumnInfoData*             pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*                       calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
  SColumnInfoData*             pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*                    pGpDatas = (uint64_t*)pGpCol->pData;
5
54liuyao 已提交
1367
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
H
Haojun Liao 已提交
1368
    SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1369
    dumyInfo.cur.pageId = -1;
H
Haojun Liao 已提交
1370

1371 1372 1373 1374 1375 1376 1377 1378
    STimeWindow win = {0};
    if (IS_FINAL_OP(pInfo)) {
      win.skey = startTsCols[i];
      win.ekey = endTsCols[i];
    } else {
      win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
    }

5
54liuyao 已提交
1379
    do {
1380 1381 1382 1383
      if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) {
        getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
        continue;
      }
5
54liuyao 已提交
1384
      uint64_t winGpId = pGpDatas[i];
1385
      bool     res = doDeleteWindow(pOperator, win.skey, winGpId);
5
54liuyao 已提交
1386 1387 1388 1389 1390
      SWinKey  winRes = {.ts = win.skey, .groupId = winGpId};
      if (pUpWins && res) {
        taosArrayPush(pUpWins, &winRes);
      }
      if (pUpdatedMap) {
5
54liuyao 已提交
1391 1392 1393 1394 1395 1396
        void* tmp = taosHashGet(pUpdatedMap, &winRes, sizeof(SWinKey));
        if (tmp) {
          void* value = *(void**)tmp;
          taosMemoryFree(value);
          taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
        }
5
54liuyao 已提交
1397 1398
      }
      getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
5
54liuyao 已提交
1399
    } while (win.ekey <= endTsCols[i]);
5
54liuyao 已提交
1400 1401 1402
  }
}

1403 1404 1405 1406 1407
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
  void*   pIte = NULL;
  size_t  keyLen = 0;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
L
Liu Jicong 已提交
1408 1409
    void*               key = tSimpleHashGetKey(pIte, &keyLen);
    uint64_t            groupId = *(uint64_t*)key;
1410
    TSKEY               ts = *(int64_t*)((char*)key + sizeof(uint64_t));
5
54liuyao 已提交
1411
    SResultRowPosition* pPos = (SResultRowPosition*)pIte;
5
54liuyao 已提交
1412
    int32_t             code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
5
54liuyao 已提交
1413 1414 1415 1416 1417 1418 1419
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

1420 1421
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
  SArray*  res = (SArray*)data;
5
54liuyao 已提交
1422 1423 1424 1425
  SWinKey* pDataPos = taosArrayGet(res, index);
  SWinKey* pWKey = (SWinKey*)pKey;

  if (pWKey->groupId > pDataPos->groupId) {
1426
    return 1;
5
54liuyao 已提交
1427 1428
  } else if (pWKey->groupId < pDataPos->groupId) {
    return -1;
1429
  }
5
54liuyao 已提交
1430 1431 1432 1433 1434 1435 1436

  if (pWKey->ts > pDataPos->ts) {
    return 1;
  } else if (pWKey->ts < pDataPos->ts) {
    return -1;
  }
  return 0;
1437 1438
}

5
54liuyao 已提交
1439
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
1440 1441
                                         SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins,
                                         SOperatorInfo* pOperator) {
5
54liuyao 已提交
1442
  qDebug("===stream===close interval window");
1443 1444 1445 1446
  void*                        pIte = NULL;
  size_t                       keyLen = 0;
  int32_t                      iter = 0;
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
1447
  int32_t                      delSize = taosArrayGetSize(pDelWins);
5
54liuyao 已提交
1448
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
1449 1450 1451 1452 1453 1454 1455 1456 1457 1458
    void*    key = tSimpleHashGetKey(pIte, &keyLen);
    SWinKey* pWinKey = (SWinKey*)key;
    if (delSize > 0) {
      int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
      if (index >= 0 && 0 == compareWinKey(pWinKey, pDelWins, index)) {
        taosArrayRemove(pDelWins, index);
        delSize = taosArrayGetSize(pDelWins);
      }
    }

5
54liuyao 已提交
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
    void*       chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
    STimeWindow win = {
        .skey = pWinKey->ts,
        .ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1,
    };
    if (isCloseWindow(&win, pTwSup)) {
      if (chIds && pPullDataMap) {
        SArray* chAy = *(SArray**)chIds;
        int32_t size = taosArrayGetSize(chAy);
        qDebug("===stream===window %" PRId64 " wait child size:%d", pWinKey->ts, size);
        for (int32_t i = 0; i < size; i++) {
          qDebug("===stream===window %" PRId64 " wait child id:%d", pWinKey->ts, *(int32_t*)taosArrayGet(chAy, i));
        }
        continue;
      } else if (pPullDataMap) {
        qDebug("===stream===close window %" PRId64, pWinKey->ts);
      }

      if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
        int32_t code = saveWinResultInfo(pWinKey->ts, pWinKey->groupId, closeWins);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528
    }
  }
  return TSDB_CODE_SUCCESS;
}

STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
  STimeWindow w = {.skey = ts, .ekey = INT64_MAX};
  w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
  return w;
}

static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, TSKEY mark, SInterval* pInterval,
                                  SWinKey* key) {
  STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
  SWinKey     next = {0};
  while (tw.ekey < mark) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, key);
    int32_t          code = streamStateGetKVByCur(pCur, &next, NULL, 0);
    streamStateFreeCur(pCur);

    void* chIds = taosHashGet(pPullDataMap, key, sizeof(SWinKey));
    if (chIds && pPullDataMap) {
      SArray* chAy = *(SArray**)chIds;
      int32_t size = taosArrayGetSize(chAy);
      qDebug("===stream===window %" PRId64 " wait child size:%d", key->ts, size);
      for (int32_t i = 0; i < size; i++) {
        qDebug("===stream===window %" PRId64 " wait child id:%d", key->ts, *(int32_t*)taosArrayGet(chAy, i));
      }
      break;
    }
    qDebug("===stream===delete window %" PRId64, key->ts);
    int32_t codeDel = streamStateDel(pState, key);
    if (codeDel != TSDB_CODE_SUCCESS) {
      code = streamStateGetFirst(pState, key);
      if (code != TSDB_CODE_SUCCESS) {
        qDebug("===stream===stream state first key: empty-empty");
        return;
      }
      continue;
    }
    if (code == TSDB_CODE_SUCCESS) {
      *key = next;
      tw = getFinalTimeWindow(key->ts, pInterval);
    }
  }
5
54liuyao 已提交
1529

5
54liuyao 已提交
1530 1531
  // for debug
  if (qDebugFlag & DEBUG_DEBUG && mark > 0) {
1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544
    SStreamStateCur* pCur = streamStateGetCur(pState, key);
    int32_t          code = streamStateCurPrev(pState, pCur);
    if (code == TSDB_CODE_SUCCESS) {
      SWinKey tmpKey = {0};
      code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
      if (code == TSDB_CODE_SUCCESS) {
        STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
        qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey,
               tw.ekey, tmpKey.groupId, mark);
      } else {
        STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
        qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
               key->groupId, mark);
5
54liuyao 已提交
1545
      }
1546 1547 1548 1549
    } else {
      STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
      qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
             key->groupId, mark);
5
54liuyao 已提交
1550
    }
1551
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1552 1553 1554
  }
}

1555
static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) {
5
54liuyao 已提交
1556 1557
  int32_t size = taosArrayGetSize(pChildren);
  for (int32_t i = 0; i < size; i++) {
1558 1559
    SOperatorInfo*               pChildOp = taosArrayGetP(pChildren, i);
    SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
5
54liuyao 已提交
1560
    ASSERTS(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE, "children trigger type should be at once");
5
54liuyao 已提交
1561
    pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
1562
    closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
1563
                              NULL, pOperator);
1564 1565 1566
  }
}

1567 1568
static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index,
                                SSDataBlock* pBlock) {
1569 1570 1571 1572 1573 1574 1575 1576
  blockDataCleanup(pBlock);
  int32_t size = taosArrayGetSize(pWins);
  if (*index == size) {
    *index = 0;
    taosArrayClear(pWins);
    return;
  }
  blockDataEnsureCapacity(pBlock, size - *index);
1577
  uint64_t uid = 0;
1578
  for (int32_t i = *index; i < size; i++) {
H
Haojun Liao 已提交
1579
    SWinKey* pWin = taosArrayGet(pWins, i);
1580 1581
    void*    tbname = NULL;
    streamStateGetParName(pInfo->pState, pWin->groupId, &tbname);
1582 1583 1584 1585 1586 1587 1588
    if (tbname == NULL) {
      appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
    } else {
      char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
      STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
      appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
    }
1589
    tdbFree(tbname);
1590
    (*index)++;
5
54liuyao 已提交
1591 1592 1593
  }
}

1594
static void destroyStateWindowOperatorInfo(void* param) {
1595
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1596
  cleanupBasicInfo(&pInfo->binfo);
1597
  taosMemoryFreeClear(pInfo->stateKey.pData);
1598
  cleanupExprSupp(&pInfo->scalarSup);
D
dapan1121 已提交
1599 1600 1601
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  cleanupAggSup(&pInfo->aggSup);
  cleanupGroupResInfo(&pInfo->groupResInfo);
1602

D
dapan1121 已提交
1603
  taosMemoryFreeClear(param);
1604 1605
}

H
Haojun Liao 已提交
1606
static void freeItem(void* param) {
L
Liu Jicong 已提交
1607
  SGroupKeys* pKey = (SGroupKeys*)param;
H
Haojun Liao 已提交
1608 1609 1610
  taosMemoryFree(pKey->pData);
}

1611
void destroyIntervalOperatorInfo(void* param) {
1612
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
1613
  cleanupBasicInfo(&pInfo->binfo);
1614
  cleanupAggSup(&pInfo->aggSup);
1615 1616 1617 1618
  cleanupExprSupp(&pInfo->scalarSupp);

  tdListFree(pInfo->binfo.resultRowInfo.openWindow);

H
Haojun Liao 已提交
1619 1620 1621 1622
  pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols);
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);

  pInfo->pPrevValues = NULL;
1623

H
Haojun Liao 已提交
1624 1625
  cleanupGroupResInfo(&pInfo->groupResInfo);
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
D
dapan1121 已提交
1626
  taosMemoryFreeClear(param);
1627 1628
}

1629
void destroyStreamFinalIntervalOperatorInfo(void* param) {
1630
  SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param;
1631
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
1632
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
1633
  // it should be empty.
5
54liuyao 已提交
1634 1635 1636
  taosHashCleanup(pInfo->pPullDataMap);
  taosArrayDestroy(pInfo->pPullWins);
  blockDataDestroy(pInfo->pPullDataRes);
L
Liu Jicong 已提交
1637 1638
  taosArrayDestroy(pInfo->pDelWins);
  blockDataDestroy(pInfo->pDelRes);
1639
  taosMemoryFreeClear(pInfo->pState);
5
54liuyao 已提交
1640

1641 1642 1643 1644
  if (pInfo->pChildren) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
      SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
5
54liuyao 已提交
1645
      destroyOperatorInfo(pChildOp);
1646
    }
L
Liu Jicong 已提交
1647
    taosArrayDestroy(pInfo->pChildren);
1648
  }
1649
  nodesDestroyNode((SNode*)pInfo->pPhyNode);
5
54liuyao 已提交
1650
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
5
54liuyao 已提交
1651
  cleanupGroupResInfo(&pInfo->groupResInfo);
5
54liuyao 已提交
1652
  cleanupExprSupp(&pInfo->scalarSupp);
1653

D
dapan1121 已提交
1654
  taosMemoryFreeClear(param);
5
54liuyao 已提交
1655 1656
}

1657
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
5
54liuyao 已提交
1658
  for (int32_t i = 0; i < numOfCols; i++) {
5
54liuyao 已提交
1659
    if (fmIsUserDefinedFunc(pFCtx[i].functionId) || !fmIsInvertible(pFCtx[i].functionId)) {
5
54liuyao 已提交
1660 1661 1662 1663 1664 1665
      return false;
    }
  }
  return true;
}

1666
static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo) {
1667 1668 1669
  // the primary timestamp column
  bool needed = false;

L
Liu Jicong 已提交
1670
  for (int32_t i = 0; i < numOfCols; ++i) {
1671
    SExprInfo* pExpr = pCtx[i].pExpr;
H
Haojun Liao 已提交
1672
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
1673
      needed = true;
H
Haojun Liao 已提交
1674
      break;
1675 1676 1677
    }
  }

H
Haojun Liao 已提交
1678 1679 1680
  if (needed) {
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
1681

H
Haojun Liao 已提交
1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696
    {  // ts column
      SColumn c = {0};
      c.colId = 1;
      c.slotId = pInfo->primaryTsIndex;
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
      c.bytes = sizeof(int64_t);
      taosArrayPush(pInfo->pInterpCols, &c);

      SGroupKeys key;
      key.bytes = c.bytes;
      key.type = c.type;
      key.isNull = true;  // to denote no value is assigned yet
      key.pData = taosMemoryCalloc(1, c.bytes);
      taosArrayPush(pInfo->pPrevValues, &key);
    }
1697 1698
  }

X
Xiaoyu Wang 已提交
1699
  for (int32_t i = 0; i < numOfCols; ++i) {
1700 1701
    SExprInfo* pExpr = pCtx[i].pExpr;

H
Haojun Liao 已提交
1702
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
1703 1704 1705
      SFunctParam* pParam = &pExpr->base.pParam[0];

      SColumn c = *pParam->pCol;
1706
      taosArrayPush(pInfo->pInterpCols, &c);
1707 1708

      SGroupKeys key = {0};
X
Xiaoyu Wang 已提交
1709 1710
      key.bytes = c.bytes;
      key.type = c.type;
1711
      key.isNull = false;
X
Xiaoyu Wang 已提交
1712
      key.pData = taosMemoryCalloc(1, c.bytes);
1713
      taosArrayPush(pInfo->pPrevValues, &key);
1714 1715 1716 1717 1718 1719
    }
  }

  return needed;
}

L
Liu Jicong 已提交
1720
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval,
5
54liuyao 已提交
1721
                            STimeWindowAggSupp* pTwSup) {
1722
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
5
54liuyao 已提交
1723
    initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, pTwSup);
1724 1725
    return;
  }
5
54liuyao 已提交
1726
  SStreamScanInfo* pScanInfo = downstream->info;
1727 1728
  pScanInfo->windowSup.parentType = type;
  pScanInfo->windowSup.pIntervalAggSup = pSup;
5
54liuyao 已提交
1729 1730 1731
  if (!pScanInfo->pUpdateInfo) {
    pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
  }
1732
  pScanInfo->interval = *pInterval;
5
54liuyao 已提交
1733
  pScanInfo->twAggSup = *pTwSup;
5
54liuyao 已提交
1734 1735
}

H
Haojun Liao 已提交
1736 1737
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t i = 0; i < numOfExpr; i++) {
L
Liu Jicong 已提交
1738
    //    pCtx[i].isStream = true;
H
Haojun Liao 已提交
1739 1740 1741
  }
}

H
Haojun Liao 已提交
1742
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode,
L
Liu Jicong 已提交
1743
                                          SExecTaskInfo* pTaskInfo, bool isStream) {
1744
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
L
Liu Jicong 已提交
1745
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1746 1747 1748 1749
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
1750
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1751 1752 1753 1754 1755 1756
  initBasicInfo(&pInfo->binfo, pResBlock);

  SExprSupp* pSup = &pOperator->exprSupp;
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
H
Haojun Liao 已提交
1757 1758
  initResultSizeInfo(&pOperator->resultInfo, 512);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1759 1760 1761

  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
L
Liu Jicong 已提交
1762 1763
  int32_t    code =
      initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState);
H
Haojun Liao 已提交
1764 1765 1766 1767 1768
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  SInterval interval = {.interval = pPhyNode->interval,
1769 1770 1771 1772 1773
                        .sliding = pPhyNode->sliding,
                        .intervalUnit = pPhyNode->intervalUnit,
                        .slidingUnit = pPhyNode->slidingUnit,
                        .offset = pPhyNode->offset,
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
1774 1775 1776 1777 1778 1779 1780

  STimeWindowAggSupp as = {
      .waterMark = pPhyNode->window.watermark,
      .calTrigger = pPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
  };

L
Liu Jicong 已提交
1781
  pInfo->win = pTaskInfo->window;
1782 1783
  pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
  pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
H
Haojun Liao 已提交
1784 1785
  pInfo->interval = interval;
  pInfo->twAggSup = as;
1786
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1787 1788 1789 1790

  if (pPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar);
H
Haojun Liao 已提交
1791
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
1792 1793 1794 1795
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }
1796

H
Haojun Liao 已提交
1797 1798 1799 1800 1801
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

1802
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
H
Haojun Liao 已提交
1803
  pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo);
1804
  if (pInfo->timeWindowInterpo) {
1805
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
H
Haojun Liao 已提交
1806 1807 1808
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
      goto _error;
    }
1809
  }
1810

1811
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
1812 1813
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1814

L
Liu Jicong 已提交
1815 1816
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo,
                                         optrDefaultBufFn, NULL);
1817 1818 1819 1820 1821 1822 1823 1824

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

L
Liu Jicong 已提交
1825
_error:
H
Haojun Liao 已提交
1826 1827 1828
  if (pInfo != NULL) {
    destroyIntervalOperatorInfo(pInfo);
  }
1829 1830 1831 1832 1833
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

1834
// todo handle multiple timeline cases. assume no timeline interweaving
1835 1836
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1837
  SExprSupp*     pSup = &pOperator->exprSupp;
1838

1839
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
1840 1841

  bool    masterScan = true;
1842
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
1843
  int64_t gid = pBlock->info.id.groupId;
1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857

  int64_t gap = pInfo->gap;

  if (!pInfo->reptScan) {
    pInfo->reptScan = true;
    pInfo->winSup.prevTs = INT64_MIN;
  }

  SWindowRowsSup* pRowSup = &pInfo->winSup;
  pRowSup->numOfRows = 0;

  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
1858 1859 1860
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
H
Haojun Liao 已提交
1861 1862
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
1863
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1864
      doKeepTuple(pRowSup, tsList[j], gid);
1865 1866 1867 1868 1869 1870 1871 1872 1873 1874
      if (j == 0 && pRowSup->startRowIndex != 0) {
        pRowSup->startRowIndex = 0;
      }
    } else {  // start a new session window
      SResultRow* pResult = NULL;

      // keep the time window for the closed time window.
      STimeWindow window = pRowSup->win;

      pRowSup->win.ekey = pRowSup->win.skey;
1875 1876
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1877
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1878
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1879 1880 1881 1882
      }

      // pInfo->numOfRows data belong to the current session window
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
H
Haojun Liao 已提交
1883
      applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
L
Liu Jicong 已提交
1884
                                      pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1885 1886

      // here we start a new session window
1887 1888
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1889 1890 1891 1892 1893
    }
  }

  SResultRow* pResult = NULL;
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
1894 1895
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1896
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1897
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1898 1899 1900
  }

  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
L
Liu Jicong 已提交
1901 1902
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
                                  pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1903 1904
}

1905
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
1906 1907 1908 1909 1910 1911
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSessionAggOperatorInfo* pInfo = pOperator->info;
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
1912
  SExprSupp*               pSup = &pOperator->exprSupp;
1913 1914

  if (pOperator->status == OP_RES_TO_RETURN) {
1915
    while (1) {
1916
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1917
      doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1918

1919
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1920
      if (!hasRemain) {
H
Haojun Liao 已提交
1921
        setOperatorCompleted(pOperator);
1922 1923
        break;
      }
1924

1925 1926 1927 1928 1929
      if (pBInfo->pRes->info.rows > 0) {
        break;
      }
    }
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1930
    return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1931 1932
  }

1933 1934 1935
  int64_t st = taosGetTimestampUs();
  int32_t order = TSDB_ORDER_ASC;

1936 1937 1938
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
1939
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1940 1941 1942 1943 1944
    if (pBlock == NULL) {
      break;
    }

    // the pDataBlock are always the same one, no need to call this again
1945
    setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
1946 1947
    blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);

1948 1949 1950
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
  }

1951 1952
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

1953 1954 1955
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;

1956
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1957
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1958
  while (1) {
1959
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1960
    doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1961

1962
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1963
    if (!hasRemain) {
H
Haojun Liao 已提交
1964
      setOperatorCompleted(pOperator);
1965 1966
      break;
    }
1967

1968 1969 1970 1971 1972
    if (pBInfo->pRes->info.rows > 0) {
      break;
    }
  }
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1973
  return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1974 1975
}

1976 1977
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
                                             SExecTaskInfo* pTaskInfo) {
1978 1979 1980 1981 1982 1983
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

1984 1985 1986
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
  SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;

1987 1988 1989
  if (pStateNode->window.pExprs != NULL) {
    int32_t    numOfScalarExpr = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
H
Hongze Cheng 已提交
1990
    int32_t    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
1991 1992 1993 1994 1995
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

1996
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
1997 1998 1999 2000 2001 2002 2003
  pInfo->stateKey.type = pInfo->stateCol.type;
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
  if (pInfo->stateKey.pData == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
2004 2005 2006 2007 2008
  int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2009 2010
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

2011 2012
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
2013
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
2014

L
Liu Jicong 已提交
2015 2016
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
2017 2018 2019 2020
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2021
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
2022
  initBasicInfo(&pInfo->binfo, pResBlock);
2023
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2024

L
Liu Jicong 已提交
2025 2026
  pInfo->twAggSup =
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
2027

2028 2029
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

X
Xiaoyu Wang 已提交
2030
  pInfo->tsSlotId = tsSlotId;
2031

L
Liu Jicong 已提交
2032 2033
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
L
Liu Jicong 已提交
2034 2035
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo,
                                         optrDefaultBufFn, NULL);
2036

2037 2038 2039 2040 2041
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2042 2043
  return pOperator;

L
Liu Jicong 已提交
2044
_error:
H
Haojun Liao 已提交
2045 2046 2047 2048
  if (pInfo != NULL) {
    destroyStateWindowOperatorInfo(pInfo);
  }

2049 2050
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
2051 2052 2053
  return NULL;
}

2054
void destroySWindowOperatorInfo(void* param) {
2055
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
2056 2057 2058
  if (pInfo == NULL) {
    return;
  }
2059

2060
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
2061 2062 2063 2064
  colDataDestroy(&pInfo->twAggSup.timeWindowData);

  cleanupAggSup(&pInfo->aggSup);
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
2065
  taosMemoryFreeClear(param);
2066 2067
}

H
Haojun Liao 已提交
2068
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
2069
                                            SExecTaskInfo* pTaskInfo) {
2070 2071 2072 2073 2074 2075
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2076
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2077
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2078

2079
  int32_t      numOfCols = 0;
H
Haojun Liao 已提交
2080
  SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
2081
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
2082
  initBasicInfo(&pInfo->binfo, pResBlock);
H
Haojun Liao 已提交
2083

L
Liu Jicong 已提交
2084 2085
  int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
                            pTaskInfo->streamInfo.pState);
2086 2087 2088 2089
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2090 2091 2092 2093
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
  pInfo->gap = pSessionNode->gap;

2094
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2095 2096
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

2097
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
2098 2099 2100
  pInfo->binfo.pRes = pResBlock;
  pInfo->winSup.prevTs = INT64_MIN;
  pInfo->reptScan = false;
H
Haojun Liao 已提交
2101 2102 2103 2104
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2105

L
Liu Jicong 已提交
2106 2107
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
2108 2109
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo,
                                         optrDefaultBufFn, NULL);
2110 2111
  pOperator->pTaskInfo = pTaskInfo;
  code = appendDownstream(pOperator, &downstream, 1);
2112 2113 2114 2115
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2116 2117
  return pOperator;

L
Liu Jicong 已提交
2118
_error:
2119
  destroySWindowOperatorInfo(pInfo);
2120 2121 2122
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2123
}
5
54liuyao 已提交
2124

5
54liuyao 已提交
2125
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
2126
                      SExecTaskInfo* pTaskInfo, SColumnInfoData* pTimeWindowData) {
5
54liuyao 已提交
2127 2128
  for (int32_t k = 0; k < numOfOutput; ++k) {
    if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
2129 2130 2131 2132 2133
      if (!pTimeWindowData) {
        continue;
      }

      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pDestCtx[k]);
L
Liu Jicong 已提交
2134 2135
      char*                p = GET_ROWCELL_INTERBUF(pEntryInfo);
      SColumnInfoData      idata = {0};
2136 2137 2138 2139 2140 2141 2142 2143
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
      idata.pData = p;

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pDestCtx[k].sfp.process(&tw, 1, &out);
      pEntryInfo->numOfRes = 1;
L
Liu Jicong 已提交
2144
    } else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
2145
      int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
5
54liuyao 已提交
2146 2147 2148
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
        pTaskInfo->code = code;
2149
        T_LONG_JMP(pTaskInfo->env, code);
5
54liuyao 已提交
2150 2151 2152 2153 2154
      }
    }
  }
}

2155 2156
bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) {
  return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0);
2157 2158
}

5
54liuyao 已提交
2159
static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SHashObj* pUpdatedMap) {
2160 2161 2162 2163
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  int32_t                      size = taosArrayGetSize(pWinArray);
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
5
54liuyao 已提交
2164
  SExprSupp*                   pSup = &pOperator->exprSupp;
5
54liuyao 已提交
2165 2166 2167
  if (!pInfo->pChildren) {
    return;
  }
5
54liuyao 已提交
2168
  for (int32_t i = 0; i < size; i++) {
H
Haojun Liao 已提交
2169
    SWinKey*    pWinRes = taosArrayGet(pWinArray, i);
2170
    SResultRow* pCurResult = NULL;
2171
    STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval);
5
54liuyao 已提交
2172
    if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup)) {
2173 2174
      continue;
    }
2175

5
54liuyao 已提交
2176
    int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
2177
    int32_t num = 0;
5
54liuyao 已提交
2178
    for (int32_t j = 0; j < numOfChildren; j++) {
2179 2180 2181 2182
      SOperatorInfo*               pChildOp = taosArrayGetP(pInfo->pChildren, j);
      SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
      SExprSupp*                   pChildSup = &pChildOp->exprSupp;
      if (!hasIntervalWindow(pChInfo->pState, pWinRes)) {
2183 2184
        continue;
      }
2185 2186 2187 2188
      if (num == 0) {
        int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
                                    pSup->rowEntryInfoOffset, &pInfo->aggSup);
        if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
S
Shengliang Guan 已提交
2189
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
2190 2191
        }
      }
2192
      num++;
2193
      SResultRow* pChResult = NULL;
2194 2195
      setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
                   pChildSup->rowEntryInfoOffset, &pChInfo->aggSup);
2196 2197
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
      compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
5
54liuyao 已提交
2198
      releaseOutputBuf(pChInfo->pState, pWinRes, pChResult);
5
54liuyao 已提交
2199
    }
2200
    if (num > 0 && pUpdatedMap) {
2201 2202 2203
      saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pUpdatedMap);
      saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize);
      releaseOutputBuf(pInfo->pState, pWinRes, pCurResult);
2204
    }
5
54liuyao 已提交
2205 2206 2207 2208 2209
  }
}

bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
  SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
2210
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
L
Liu Jicong 已提交
2211
                                                               GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
5
54liuyao 已提交
2212 2213 2214
  return p1 == NULL;
}

2215
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
5
54liuyao 已提交
2216 2217
  if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
    SWinKey key = {.ts = pWin->skey, .groupId = groupId};
5
54liuyao 已提交
2218
    if (streamStateGet(pState, &key, NULL, 0) == TSDB_CODE_SUCCESS) {
5
54liuyao 已提交
2219 2220
      return false;
    }
2221
    return true;
5
54liuyao 已提交
2222 2223 2224 2225
  }
  return false;
}

L
Liu Jicong 已提交
2226 2227 2228 2229
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey,
                        STimeWindow* pNextWin) {
  int32_t forwardRows =
      getNumOfRowsInTimeWindow(pBlockInfo, tsCols, startPos, eKey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
5
54liuyao 已提交
2230 2231 2232 2233
  int32_t prevEndPos = forwardRows - 1 + startPos;
  return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
}

H
Haojun Liao 已提交
2234
void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
5
54liuyao 已提交
2235 2236 2237 2238
  SArray* childIds = taosArrayInit(8, sizeof(int32_t));
  for (int32_t i = 0; i < size; i++) {
    taosArrayPush(childIds, &i);
  }
H
Haojun Liao 已提交
2239
  taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*));
5
54liuyao 已提交
2240 2241 2242 2243
}

static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }

2244
static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) {
2245
  tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
5
54liuyao 已提交
2246
  clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
2247
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
2248
  pInfo->aggSup.currentPageId = -1;
2249
  streamStateClear(pInfo->pState);
5
54liuyao 已提交
2250 2251
}

5
54liuyao 已提交
2252 2253 2254 2255
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
  if (pBlock->info.rows <= 0) {
    return;
  }
5
54liuyao 已提交
2256 2257 2258
  blockDataCleanup(pBlock);
}

5
54liuyao 已提交
2259 2260 2261 2262 2263 2264
static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) {
  clearSpecialDataBlock(pBlock);
  int32_t size = taosArrayGetSize(array);
  if (size - (*pIndex) == 0) {
    return;
  }
L
Liu Jicong 已提交
2265
  blockDataEnsureCapacity(pBlock, size - (*pIndex));
2266 2267 2268 2269 2270
  SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
5
54liuyao 已提交
2271
  for (; (*pIndex) < size; (*pIndex)++) {
L
Liu Jicong 已提交
2272
    SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex));
5
54liuyao 已提交
2273 2274 2275
    colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
    colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
    colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
5
54liuyao 已提交
2276 2277
    colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->calWin.skey, false);
    colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false);
5
54liuyao 已提交
2278 2279 2280 2281 2282 2283 2284 2285 2286
    pBlock->info.rows++;
  }
  if ((*pIndex) == size) {
    *pIndex = 0;
    taosArrayClear(array);
  }
  blockDataUpdateTsWindow(pBlock, 0);
}

5
54liuyao 已提交
2287
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
5
54liuyao 已提交
2288
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
2289
  TSKEY*           tsData = (TSKEY*)pStartCol->pData;
5
54liuyao 已提交
2290 2291
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           tsEndData = (TSKEY*)pEndCol->pData;
5
54liuyao 已提交
2292
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
L
Liu Jicong 已提交
2293 2294
  uint64_t*        groupIdData = (uint64_t*)pGroupCol->pData;
  int32_t          chId = getChildIndex(pBlock);
5
54liuyao 已提交
2295
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
5
54liuyao 已提交
2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310
    TSKEY winTs = tsData[i];
    while (winTs < tsEndData[i]) {
      SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
      void*   chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
      if (chIds) {
        SArray* chArray = *(SArray**)chIds;
        int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
        if (index != -1) {
          qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
          taosArrayRemove(chArray, index);
          if (taosArrayGetSize(chArray) == 0) {
            // pull data is over
            taosArrayDestroy(chArray);
            taosHashRemove(pMap, &winRes, sizeof(SWinKey));
          }
5
54liuyao 已提交
2311 2312
        }
      }
5
54liuyao 已提交
2313
      winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
5
54liuyao 已提交
2314 2315 2316
    }
  }
}
5
54liuyao 已提交
2317

2318
static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
2319 2320
  int32_t size = taosArrayGetSize(wins);
  for (int32_t i = 0; i < size; i++) {
L
Liu Jicong 已提交
2321
    SWinKey*    winKey = taosArrayGet(wins, i);
2322
    STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval);
2323
    if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
2324 2325
      void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
      if (!chIds) {
L
Liu Jicong 已提交
2326 2327
        SPullWindowInfo pull = {
            .window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
2328
        // add pull data request
5
54liuyao 已提交
2329 2330 2331 2332 2333
        if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
          int32_t size1 = taosArrayGetSize(pInfo->pChildren);
          addPullWindow(pInfo->pPullDataMap, winKey, size1);
          qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
        }
2334 2335 2336 2337 2338
      }
    }
  }
}

5
54liuyao 已提交
2339 2340 2341 2342 2343 2344
static void clearFunctionContext(SExprSupp* pSup) {
  for (int32_t i = 0; i < pSup->numOfExprs; i++) {
    pSup->pCtx[i].saveHandle.currentPage = -1;
  }
}

2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355
void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
2356
  pBlock->info.id.groupId = 0;
2357
  buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
2358 2359
}

5
54liuyao 已提交
2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370
static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
                                           TSKEY* primaryKeys, int32_t prevPosition) {
  int32_t startPos = prevPosition + 1;
  if (startPos == pDataBlockInfo->rows) {
    startPos = -1;
  } else {
    *pNext = getFinalTimeWindow(primaryKeys[startPos], pInterval);
  }
  return startPos;
}

2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
                                    SHashObj* pUpdatedMap) {
  SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;

  SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
  SExecTaskInfo*  pTaskInfo = pOperatorInfo->pTaskInfo;
  SExprSupp*      pSup = &pOperatorInfo->exprSupp;
  int32_t         numOfOutput = pSup->numOfExprs;
  int32_t         step = 1;
  TSKEY*          tsCols = NULL;
  SResultRow*     pResult = NULL;
  int32_t         forwardRows = 0;

  SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
  tsCols = (int64_t*)pColDataInfo->pData;

  int32_t     startPos = 0;
  TSKEY       ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
5
54liuyao 已提交
2389 2390 2391 2392 2393 2394
  STimeWindow nextWin = {0};
  if (IS_FINAL_OP(pInfo)) {
    nextWin = getFinalTimeWindow(ts, &pInfo->interval);
  } else {
    nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
  }
2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412
  while (1) {
    bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
    if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
      startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
      if (startPos < 0) {
        break;
      }
      continue;
    }

    if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) {
      bool    ignore = true;
      SWinKey winRes = {
          .ts = nextWin.skey,
          .groupId = groupId,
      };
      void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
      if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) {
L
Liu Jicong 已提交
2413 2414
        SPullWindowInfo pull = {
            .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
2415
        // add pull data request
5
54liuyao 已提交
2416 2417 2418 2419 2420
        if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
          int32_t size = taosArrayGetSize(pInfo->pChildren);
          addPullWindow(pInfo->pPullDataMap, &winRes, size);
          qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
        }
2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446
      } else {
        int32_t index = -1;
        SArray* chArray = NULL;
        int32_t chId = 0;
        if (chIds) {
          chArray = *(void**)chIds;
          chId = getChildIndex(pSDataBlock);
          index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
        }
        if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
          ignore = false;
        }
      }

      if (ignore) {
        startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
        if (startPos < 0) {
          break;
        }
        continue;
      }
    }

    int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput,
                                pSup->rowEntryInfoOffset, &pInfo->aggSup);
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
2447
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
2448 2449
    }

5
54liuyao 已提交
2450 2451 2452 2453 2454 2455
    if (IS_FINAL_OP(pInfo)) {
      forwardRows = 1;
    } else {
      forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
                                             NULL, TSDB_ORDER_ASC);
    }
2456 2457 2458
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
      saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap);
    }
5
54liuyao 已提交
2459 2460 2461 2462 2463 2464 2465 2466

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
      SWinKey key = {
          .ts = pResult->win.skey,
          .groupId = groupId,
      };
      tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
    }
2467
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
H
Haojun Liao 已提交
2468
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
L
Liu Jicong 已提交
2469
                                    pSDataBlock->info.rows, numOfOutput);
2470 2471 2472 2473 2474 2475 2476 2477 2478 2479
    SWinKey key = {
        .ts = nextWin.skey,
        .groupId = groupId,
    };
    saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize);
    releaseOutputBuf(pInfo->pState, &key, pResult);
    if (pInfo->delKey.ts > key.ts) {
      pInfo->delKey = key;
    }
    int32_t prevEndPos = (forwardRows - 1) * step + startPos;
2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492
    if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
      qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
             ",maxKey %" PRId64,
             pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
      blockDataUpdateTsWindow(pSDataBlock, 0);

      // timestamp of the data is incorrect
      if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
        qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64,
               pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
      }
    }

5
54liuyao 已提交
2493 2494 2495 2496 2497 2498
    if (IS_FINAL_OP(pInfo)) {
      startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos);
    } else {
      startPos =
          getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
    }
2499 2500 2501 2502 2503 2504
    if (startPos < 0) {
      break;
    }
  }
}

5
54liuyao 已提交
2505
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
2506
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2507
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
2508 2509 2510

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  TSKEY          maxTs = INT64_MIN;
5
54liuyao 已提交
2511
  TSKEY          minTs = INT64_MAX;
5
54liuyao 已提交
2512

2513 2514
  SExprSupp* pSup = &pOperator->exprSupp;

5
54liuyao 已提交
2515
  qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
L
Liu Jicong 已提交
2516

5
54liuyao 已提交
2517 2518 2519
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  } else if (pOperator->status == OP_RES_TO_RETURN) {
5
54liuyao 已提交
2520 2521 2522
    doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
    if (pInfo->pPullDataRes->info.rows != 0) {
      // process the rest of the data
5
54liuyao 已提交
2523
      printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2524 2525 2526
      return pInfo->pPullDataRes;
    }

2527
    doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
2528 2529 2530 2531 2532 2533
    if (pInfo->pDelRes->info.rows != 0) {
      // process the rest of the data
      printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
      return pInfo->pDelRes;
    }

2534
    doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2535 2536 2537
    if (pInfo->binfo.pRes->info.rows != 0) {
      printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
      return pInfo->binfo.pRes;
5
54liuyao 已提交
2538
    }
5
54liuyao 已提交
2539

H
Haojun Liao 已提交
2540
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
2541 2542 2543 2544 2545 2546
    if (!IS_FINAL_OP(pInfo)) {
      clearFunctionContext(&pOperator->exprSupp);
      // semi interval operator clear disk buffer
      clearStreamIntervalOperator(pInfo);
      qDebug("===stream===clear semi operator");
    } else {
2547 2548
      deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
                            &pInfo->interval, &pInfo->delKey);
L
Liu Jicong 已提交
2549
      streamStateCommit(pTaskInfo->streamInfo.pState);
5
54liuyao 已提交
2550 2551
    }
    return NULL;
5
54liuyao 已提交
2552
  } else {
5
54liuyao 已提交
2553
    if (!IS_FINAL_OP(pInfo)) {
2554
      doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
2555 2556 2557 2558 2559 2560
      if (pInfo->pDelRes->info.rows != 0) {
        // process the rest of the data
        printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
        return pInfo->pDelRes;
      }

2561
      doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2562
      if (pInfo->binfo.pRes->info.rows != 0) {
5
54liuyao 已提交
2563
        printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2564 2565
        return pInfo->binfo.pRes;
      }
5
54liuyao 已提交
2566
    }
5
54liuyao 已提交
2567 2568
  }

5
54liuyao 已提交
2569 2570 2571
  SArray*    pUpdated = taosArrayInit(4, POINTER_BYTES);
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  SHashObj*  pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
2572 2573 2574
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
2575
      pOperator->status = OP_RES_TO_RETURN;
L
Liu Jicong 已提交
2576 2577
      qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
             IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
5
54liuyao 已提交
2578
      pInfo->numOfDatapack = 0;
5
54liuyao 已提交
2579 2580
      break;
    }
5
54liuyao 已提交
2581
    pInfo->numOfDatapack++;
5
54liuyao 已提交
2582
    printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
2583

H
Haojun Liao 已提交
2584
    if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
5
54liuyao 已提交
2585
      pInfo->binfo.pRes->info.type = pBlock->info.type;
2586 2587
    } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
               pBlock->info.type == STREAM_CLEAR) {
2588
      SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
2589
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pUpdatedMap);
2590
      if (IS_FINAL_OP(pInfo)) {
2591 2592 2593 2594
        int32_t                      childIndex = getChildIndex(pBlock);
        SOperatorInfo*               pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
        SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
        SExprSupp*                   pChildSup = &pChildOp->exprSupp;
2595
        doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL);
5
54liuyao 已提交
2596
        rebuildIntervalWindow(pOperator, delWins, pUpdatedMap);
2597 2598 2599
        addRetriveWindow(delWins, pInfo);
        taosArrayAddAll(pInfo->pDelWins, delWins);
        taosArrayDestroy(delWins);
2600 2601
        continue;
      }
2602 2603 2604
      removeResults(delWins, pUpdatedMap);
      taosArrayAddAll(pInfo->pDelWins, delWins);
      taosArrayDestroy(delWins);
2605
      break;
5
54liuyao 已提交
2606
    } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
5
54liuyao 已提交
2607
      getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
5
54liuyao 已提交
2608
      continue;
5
54liuyao 已提交
2609
    } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
2610
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap);
5
54liuyao 已提交
2611 2612 2613 2614
      if (taosArrayGetSize(pUpdated) > 0) {
        break;
      }
      continue;
L
Liu Jicong 已提交
2615
    } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
5
54liuyao 已提交
2616
      processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
5
54liuyao 已提交
2617
      continue;
5
54liuyao 已提交
2618
    }
5
54liuyao 已提交
2619

5
54liuyao 已提交
2620 2621 2622 2623
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
2624
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
H
Haojun Liao 已提交
2625
    doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap);
5
54liuyao 已提交
2626
    if (IS_FINAL_OP(pInfo)) {
S
shenglian zhou 已提交
2627
      int32_t chIndex = getChildIndex(pBlock);
5
54liuyao 已提交
2628 2629 2630 2631 2632
      int32_t size = taosArrayGetSize(pInfo->pChildren);
      // if chIndex + 1 - size > 0, add new child
      for (int32_t i = 0; i < chIndex + 1 - size; i++) {
        SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
        if (!pChildOp) {
S
Shengliang Guan 已提交
2633
          T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
2634
        }
2635
        SStreamIntervalOperatorInfo* pTmpInfo = pChildOp->info;
2636
        pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
5
54liuyao 已提交
2637
        taosArrayPush(pInfo->pChildren, &pChildOp);
5
54liuyao 已提交
2638
        qDebug("===stream===add child, id:%d", chIndex);
5
54liuyao 已提交
2639
      }
2640 2641
      SOperatorInfo*               pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
      SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
2642
      setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
H
Haojun Liao 已提交
2643
      doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.id.groupId, NULL);
5
54liuyao 已提交
2644
    }
5
54liuyao 已提交
2645 2646 2647
    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
    maxTs = TMAX(maxTs, pBlock->info.watermark);
    minTs = TMIN(minTs, pBlock->info.window.skey);
5
54liuyao 已提交
2648
  }
S
shenglian zhou 已提交
2649

2650
  removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
5
54liuyao 已提交
2651
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
2652
  pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
5
54liuyao 已提交
2653
  if (IS_FINAL_OP(pInfo)) {
2654
    closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
2655
                              pInfo->pPullDataMap, pUpdatedMap, pInfo->pDelWins, pOperator);
2656
    closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
5
54liuyao 已提交
2657
  }
2658
  pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
5
54liuyao 已提交
2659

5
54liuyao 已提交
2660 2661 2662 2663 2664 2665 2666
  void* pIte = NULL;
  while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
    taosArrayPush(pUpdated, pIte);
  }
  taosHashCleanup(pUpdatedMap);
  taosArraySort(pUpdated, resultrowComparAsc);

5
54liuyao 已提交
2667 2668
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
2669 2670 2671 2672

  doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
  if (pInfo->pPullDataRes->info.rows != 0) {
    // process the rest of the data
5
54liuyao 已提交
2673
    printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2674 2675 2676
    return pInfo->pPullDataRes;
  }

2677
  doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
5
54liuyao 已提交
2678 2679 2680 2681 2682 2683
  if (pInfo->pDelRes->info.rows != 0) {
    // process the rest of the data
    printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
    return pInfo->pDelRes;
  }

2684
  doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2685
  if (pInfo->binfo.pRes->info.rows != 0) {
5
54liuyao 已提交
2686
    printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2687 2688 2689 2690 2691 2692
    return pInfo->binfo.pRes;
  }

  return NULL;
}

5
54liuyao 已提交
2693 2694 2695 2696
int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
  if (pIntervalPhyNode->window.deleteMark <= 0) {
    return DEAULT_DELETE_MARK;
  }
L
Liu Jicong 已提交
2697
  int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark, pIntervalPhyNode->window.watermark);
5
54liuyao 已提交
2698 2699 2700 2701
  deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
  return deleteMark;
}

S
shenglian zhou 已提交
2702 2703
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                     SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
2704 2705 2706
  SIntervalPhysiNode*          pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
  SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5
54liuyao 已提交
2707 2708 2709
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2710

2711
  pOperator->pTaskInfo = pTaskInfo;
S
shenglian zhou 已提交
2712 2713 2714 2715 2716 2717 2718 2719
  pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
                                .sliding = pIntervalPhyNode->sliding,
                                .intervalUnit = pIntervalPhyNode->intervalUnit,
                                .slidingUnit = pIntervalPhyNode->slidingUnit,
                                .offset = pIntervalPhyNode->offset,
                                .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pIntervalPhyNode->window.watermark,
5
54liuyao 已提交
2720 2721
      .calTrigger = pIntervalPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
2722
      .minTs = INT64_MAX,
5
54liuyao 已提交
2723
      .deleteMark = getDeleteMark(pIntervalPhyNode),
L
Liu Jicong 已提交
2724 2725
      .deleteMarkSaved = 0,
      .calTriggerSaved = 0,
S
shenglian zhou 已提交
2726
  };
5
54liuyao 已提交
2727
  ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
5
54liuyao 已提交
2728 2729
  pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2730
  initResultSizeInfo(&pOperator->resultInfo, 4096);
5
54liuyao 已提交
2731 2732 2733 2734 2735 2736 2737 2738 2739
  if (pIntervalPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
    int32_t    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

S
shenglian zhou 已提交
2740 2741
  int32_t      numOfCols = 0;
  SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
2742
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
5
54liuyao 已提交
2743
  initBasicInfo(&pInfo->binfo, pResBlock);
2744

L
Liu Jicong 已提交
2745 2746
  int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
                            pTaskInfo->streamInfo.pState);
2747 2748 2749 2750
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2751
  initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
2752

5
54liuyao 已提交
2753
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2754

2755 2756 2757 2758
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
  streamStateSetNumber(pInfo->pState, -1);

2759
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
5
54liuyao 已提交
2760 2761
  pInfo->pChildren = NULL;
  if (numOfChild > 0) {
2762
    pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
5
54liuyao 已提交
2763 2764 2765
    if (!pInfo->pChildren) {
      goto _error;
    }
5
54liuyao 已提交
2766 2767 2768
    for (int32_t i = 0; i < numOfChild; i++) {
      SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
      if (pChildOp) {
2769
        SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
2770
        pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
5
54liuyao 已提交
2771
        taosArrayPush(pInfo->pChildren, &pChildOp);
2772
        streamStateSetNumber(pChInfo->pState, i);
5
54liuyao 已提交
2773 2774 2775 2776 2777
        continue;
      }
      goto _error;
    }
  }
5
54liuyao 已提交
2778

2779
  pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
5
54liuyao 已提交
2780

5
54liuyao 已提交
2781 2782 2783 2784
  if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
    pInfo->isFinal = true;
    pOperator->name = "StreamFinalIntervalOperator";
  } else {
5
54liuyao 已提交
2785
    // semi interval operator does not catch result
5
54liuyao 已提交
2786 2787 2788 2789
    pInfo->isFinal = false;
    pOperator->name = "StreamSemiIntervalOperator";
  }

5
54liuyao 已提交
2790
  if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
5
54liuyao 已提交
2791 2792
    pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
  }
5
54liuyao 已提交
2793 2794 2795 2796
  pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
  pInfo->pullIndex = 0;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
2797
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
5
54liuyao 已提交
2798
  pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
2799
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
2800
  pInfo->delIndex = 0;
H
Haojun Liao 已提交
2801
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
2802 2803
  pInfo->delKey.ts = INT64_MAX;
  pInfo->delKey.groupId = 0;
5
54liuyao 已提交
2804
  pInfo->numOfDatapack = 0;
5
54liuyao 已提交
2805

5
54liuyao 已提交
2806
  pOperator->operatorType = pPhyNode->type;
5
54liuyao 已提交
2807 2808 2809 2810
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;

L
Liu Jicong 已提交
2811 2812
  pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
                                         optrDefaultBufFn, NULL);
2813
  if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
5
54liuyao 已提交
2814
    initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
2815
  }
5
54liuyao 已提交
2816 2817 2818 2819 2820 2821 2822 2823
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
2824
  destroyStreamFinalIntervalOperatorInfo(pInfo);
5
54liuyao 已提交
2825 2826 2827
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
5
54liuyao 已提交
2828
}
5
54liuyao 已提交
2829 2830

void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
5
54liuyao 已提交
2831
  tSimpleHashCleanup(pSup->pResultRows);
5
54liuyao 已提交
2832 2833
  destroyDiskbasedBuf(pSup->pResultBuf);
  blockDataDestroy(pSup->pScanBlock);
5
54liuyao 已提交
2834 2835
  taosMemoryFreeClear(pSup->pState);
  taosMemoryFreeClear(pSup->pDummyCtx);
5
54liuyao 已提交
2836 2837
}

2838
void destroyStreamSessionAggOperatorInfo(void* param) {
5
54liuyao 已提交
2839
  SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
2840
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
2841
  destroyStreamAggSupporter(&pInfo->streamAggSup);
5
54liuyao 已提交
2842

2843 2844 2845
  if (pInfo->pChildren != NULL) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
2846 2847
      SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
      destroyOperatorInfo(pChild);
2848
    }
5
54liuyao 已提交
2849
    taosArrayDestroy(pInfo->pChildren);
2850
  }
5
54liuyao 已提交
2851 2852 2853 2854
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  blockDataDestroy(pInfo->pDelRes);
  blockDataDestroy(pInfo->pWinBlock);
  blockDataDestroy(pInfo->pUpdateRes);
5
54liuyao 已提交
2855
  tSimpleHashCleanup(pInfo->pStDeleted);
2856

D
dapan1121 已提交
2857
  taosMemoryFreeClear(param);
5
54liuyao 已提交
2858 2859
}

2860 2861
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
                        SSDataBlock* pResultBlock) {
H
Haojun Liao 已提交
2862
  initBasicInfo(pBasicInfo, pResultBlock);
2863 2864 2865 2866
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
2867

H
Haojun Liao 已提交
2868
  initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
5
54liuyao 已提交
2869
  for (int32_t i = 0; i < numOfCols; ++i) {
2870
    pSup->pCtx[i].saveHandle.pBuf = NULL;
5
54liuyao 已提交
2871
  }
2872

2873
  ASSERT(numOfCols > 0);
5
54liuyao 已提交
2874 2875 2876 2877 2878 2879 2880 2881
  return TSDB_CODE_SUCCESS;
}

void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t nums) {
  for (int i = 0; i < nums; i++) {
    pDummy[i].functionId = pCtx[i].functionId;
  }
}
5
54liuyao 已提交
2882

5
54liuyao 已提交
2883 2884
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
                    STimeWindowAggSupp* pTwSup) {
2885 2886 2887 2888 2889 2890
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
    SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
    pScanInfo->tsColIndex = tsColIndex;
  }

  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
5
54liuyao 已提交
2891
    initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup);
2892 2893
    return;
  }
2894
  SStreamScanInfo* pScanInfo = downstream->info;
5
54liuyao 已提交
2895
  pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
5
54liuyao 已提交
2896
  if (!pScanInfo->pUpdateInfo) {
5
54liuyao 已提交
2897
    pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
5
54liuyao 已提交
2898
  }
5
54liuyao 已提交
2899
  pScanInfo->twAggSup = *pTwSup;
5
54liuyao 已提交
2900 2901
}

5
54liuyao 已提交
2902 2903 2904 2905 2906 2907 2908 2909 2910 2911
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
                               SStreamState* pState, int32_t keySize, int16_t keyType) {
  pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
  pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
  pSup->gap = gap;
  pSup->stateKeySize = keySize;
  pSup->stateKeyType = keyType;
  pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
  if (pSup->pDummyCtx == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
2912
  }
H
Haojun Liao 已提交
2913

5
54liuyao 已提交
2914 2915 2916 2917
  initDummyFunction(pSup->pDummyCtx, pCtx, numOfOutput);
  pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pSup->pState) = *pState;
  streamStateSetNumber(pSup->pState, -1);
2918

5
54liuyao 已提交
2919 2920
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = tSimpleHashInit(32, hashFn);
X
Xiaoyu Wang 已提交
2921

5
54liuyao 已提交
2922 2923 2924
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
5
54liuyao 已提交
2925
  }
5
54liuyao 已提交
2926 2927 2928 2929
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
5
54liuyao 已提交
2930
  }
5
54liuyao 已提交
2931 2932 2933 2934
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s", terrstr(terrno));
    return terrno;
5
54liuyao 已提交
2935
  }
5
54liuyao 已提交
2936 2937 2938
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
  for (int32_t i = 0; i < numOfOutput; ++i) {
    pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
5
54liuyao 已提交
2939 2940
  }

5
54liuyao 已提交
2941
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
2942
}
5
54liuyao 已提交
2943 2944

bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
5
54liuyao 已提交
2945
  if (ts + gap >= pWin->skey && ts - gap <= pWin->ekey) {
5
54liuyao 已提交
2946 2947 2948 2949 2950
    return true;
  }
  return false;
}

5
54liuyao 已提交
2951 2952
bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) {
  return isInTimeWindow(&pWinInfo->sessionWin.win, ts, gap);
5
54liuyao 已提交
2953 2954
}

5
54liuyao 已提交
2955 2956 2957 2958 2959
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
                         SSessionKey* pKey) {
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
2960
  int32_t code = streamStateSessionGetKeyByRange(pAggSup->pState, pKey, pKey);
5
54liuyao 已提交
2961 2962
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
2963 2964 2965
  }
}

5
54liuyao 已提交
2966
bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->sessionWin.win.skey == 0; }
5
54liuyao 已提交
2967

5
54liuyao 已提交
2968 2969 2970
void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
                         SResultWindowInfo* pCurWin) {
  pCurWin->sessionWin.groupId = groupId;
2971 2972
  pCurWin->sessionWin.win.skey = startTs;
  pCurWin->sessionWin.win.ekey = endTs;
5
54liuyao 已提交
2973
  int32_t size = pAggSup->resultRowSize;
2974 2975
  int32_t code =
      streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, pAggSup->gap, &pCurWin->pOutputBuf, &size);
5
54liuyao 已提交
2976 2977 2978 2979 2980
  if (code == TSDB_CODE_SUCCESS) {
    pCurWin->isOutput = true;
  } else {
    pCurWin->sessionWin.win.skey = startTs;
    pCurWin->sessionWin.win.ekey = endTs;
5
54liuyao 已提交
2981
  }
5
54liuyao 已提交
2982
}
5
54liuyao 已提交
2983

5
54liuyao 已提交
2984 2985
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
  int32_t size = 0;
2986
  int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
5
54liuyao 已提交
2987 2988
  if (code != TSDB_CODE_SUCCESS) {
    return code;
5
54liuyao 已提交
2989
  }
5
54liuyao 已提交
2990 2991 2992 2993 2994 2995
  streamStateCurNext(pAggSup->pState, pCur);
  return TSDB_CODE_SUCCESS;
}
void saveDeleteInfo(SArray* pWins, SSessionKey key) {
  // key.win.ekey = key.win.skey;
  taosArrayPush(pWins, &key);
5
54liuyao 已提交
2996 2997
}

5
54liuyao 已提交
2998 2999 3000 3001
void saveDeleteRes(SSHashObj* pStDelete, SSessionKey key) {
  key.win.ekey = key.win.skey;
  tSimpleHashPut(pStDelete, &key, sizeof(SSessionKey), NULL, 0);
}
3002

5
54liuyao 已提交
3003 3004 3005 3006 3007
static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) {
  key.win.ekey = key.win.skey;
  tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
  tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
}
5
54liuyao 已提交
3008

5
54liuyao 已提交
3009 3010 3011 3012 3013
static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
  *pHashKey = *pKey;
  pHashKey->win.ekey = pKey->win.skey;
}

5
54liuyao 已提交
3014 3015 3016
static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
  if (tSimpleHashGetSize(pHashMap) == 0) {
    return;
5
54liuyao 已提交
3017
  }
5
54liuyao 已提交
3018 3019 3020 3021
  int32_t size = taosArrayGetSize(pWins);
  for (int32_t i = 0; i < size; i++) {
    SSessionKey* pWin = taosArrayGet(pWins, i);
    if (!pWin) continue;
5
54liuyao 已提交
3022 3023
    SSessionKey key = {0};
    getSessionHashKey(pWin, &key);
5
54liuyao 已提交
3024
    tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
5
54liuyao 已提交
3025 3026 3027
  }
}

dengyihao's avatar
dengyihao 已提交
3028
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,
5
54liuyao 已提交
3029 3030
                                int32_t rows, int32_t start, int64_t gap, SSHashObj* pResultRows, SSHashObj* pStUpdated,
                                SSHashObj* pStDeleted) {
5
54liuyao 已提交
3031
  for (int32_t i = start; i < rows; ++i) {
3032
    if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
5
54liuyao 已提交
3033 3034
      return i - start;
    }
5
54liuyao 已提交
3035
    if (pWinInfo->sessionWin.win.skey > pStartTs[i]) {
5
54liuyao 已提交
3036
      if (pStDeleted && pWinInfo->isOutput) {
5
54liuyao 已提交
3037
        saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
5
54liuyao 已提交
3038
      }
5
54liuyao 已提交
3039 3040
      removeSessionResult(pStUpdated, pResultRows, pWinInfo->sessionWin);
      pWinInfo->sessionWin.win.skey = pStartTs[i];
5
54liuyao 已提交
3041
    }
5
54liuyao 已提交
3042
    pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]);
5
54liuyao 已提交
3043
    if (pEndTs) {
5
54liuyao 已提交
3044
      pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pEndTs[i]);
5
54liuyao 已提交
3045 3046 3047 3048 3049
    }
  }
  return rows - start;
}

5
54liuyao 已提交
3050 3051
static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
                                    int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
3052
  ASSERT(pWinInfo->sessionWin.win.skey <= pWinInfo->sessionWin.win.ekey);
5
54liuyao 已提交
3053
  *pResult = (SResultRow*)pWinInfo->pOutputBuf;
5
54liuyao 已提交
3054
  // set time window for current result
5
54liuyao 已提交
3055
  (*pResult)->win = pWinInfo->sessionWin.win;
3056
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
5
54liuyao 已提交
3057 3058 3059
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3060 3061 3062
static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
                                  int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
                                  SOperatorInfo* pOperator) {
3063
  SExprSupp*     pSup = &pOperator->exprSupp;
3064
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3065
  int32_t        code = initSessionOutputBuf(pCurWin, pResult, pSup->pCtx, numOutput, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
3066
  if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
S
Shengliang Guan 已提交
3067
    return TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
3068
  }
5
54liuyao 已提交
3069
  updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, false);
H
Haojun Liao 已提交
3070
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput);
5
54liuyao 已提交
3071 3072 3073
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3074 3075
static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
  streamStateSessionDel(pAggSup->pState, pKey);
5
54liuyao 已提交
3076 3077 3078
  SSessionKey hashKey = {0};
  getSessionHashKey(pKey, &hashKey);
  tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
5
54liuyao 已提交
3079 3080 3081 3082 3083 3084 3085 3086 3087 3088
  return true;
}

static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo) {
  void* pVal = tSimpleHashGet(pStUpdated, &pWinInfo->sessionWin, sizeof(SSessionKey));
  if (pVal) {
    SResultWindowInfo* pWin = pVal;
    pWinInfo->isOutput = pWin->isOutput;
  }
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3089 3090
}

5
54liuyao 已提交
3091 3092 3093 3094 3095 3096 3097
SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
                                       SResultWindowInfo* pNextWin) {
  SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->sessionWin);
  pNextWin->isOutput = true;
  setSessionWinOutputInfo(pStUpdated, pNextWin);
  int32_t size = 0;
  pNextWin->sessionWin = pCurWin->sessionWin;
3098
  int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
5
54liuyao 已提交
3099 3100 3101 3102
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_INVALID(*pNextWin);
  }
  return pCur;
5
54liuyao 已提交
3103 3104
}

5
54liuyao 已提交
3105 3106 3107 3108 3109 3110 3111 3112 3113
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
                                 SSHashObj* pStDeleted) {
  SExprSupp*                     pSup = &pOperator->exprSupp;
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SResultRow*                    pCurResult = NULL;
  int32_t                        numOfOutput = pOperator->exprSupp.numOfExprs;
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
  initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
3114
  // Just look for the window behind StartIndex
5
54liuyao 已提交
3115 3116 3117 3118 3119 3120
  while (1) {
    SResultWindowInfo winInfo = {0};
    SStreamStateCur*  pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo);
    if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap)) {
      streamStateFreeCur(pCur);
      break;
5
54liuyao 已提交
3121
    }
5
54liuyao 已提交
3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133
    SResultRow* pWinResult = NULL;
    initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
    pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true);
    compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
    tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey));
    if (winInfo.isOutput && pStDeleted) {
      saveDeleteRes(pStDeleted, winInfo.sessionWin);
    }
    removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin);
    doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
    streamStateFreeCur(pCur);
5
54liuyao 已提交
3134 3135 3136
  }
}

5
54liuyao 已提交
3137 3138 3139
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
  saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize);
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3140 3141
}

5
54liuyao 已提交
3142 3143
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
                                   SSHashObj* pStDeleted, bool hasEndTs) {
X
Xiaoyu Wang 已提交
3144
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3145
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
3146
  int32_t                        numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
3147
  uint64_t                       groupId = pSDataBlock->info.id.groupId;
X
Xiaoyu Wang 已提交
3148
  int64_t                        code = TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3149 3150 3151
  SResultRow*                    pResult = NULL;
  int32_t                        rows = pSDataBlock->info.rows;
  int32_t                        winRows = 0;
X
Xiaoyu Wang 已提交
3152

5
54liuyao 已提交
3153
  SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
3154
  TSKEY*           startTsCols = (int64_t*)pStartTsCol->pData;
5
54liuyao 已提交
3155 3156 3157
  SColumnInfoData* pEndTsCol = NULL;
  if (hasEndTs) {
    pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->endTsIndex);
5
54liuyao 已提交
3158
  } else {
5
54liuyao 已提交
3159
    pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
3160
  }
X
Xiaoyu Wang 已提交
3161

5
54liuyao 已提交
3162
  TSKEY*               endTsCols = (int64_t*)pEndTsCol->pData;
5
54liuyao 已提交
3163
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3164
  for (int32_t i = 0; i < rows;) {
5
54liuyao 已提交
3165
    if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
5
54liuyao 已提交
3166 3167 3168
      i++;
      continue;
    }
5
54liuyao 已提交
3169 3170 3171 3172 3173
    SResultWindowInfo winInfo = {0};
    setSessionOutputBuf(pAggSup, startTsCols[i], endTsCols[i], groupId, &winInfo);
    setSessionWinOutputInfo(pStUpdated, &winInfo);
    winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
                                      pAggSup->pResultRows, pStUpdated, pStDeleted);
5
54liuyao 已提交
3174 3175
    // coverity scan error
    if (!winInfo.pOutputBuf) {
S
Shengliang Guan 已提交
3176
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3177
    }
L
Liu Jicong 已提交
3178

5
54liuyao 已提交
3179 3180
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
                              pOperator);
5
54liuyao 已提交
3181
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
3182
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3183
    }
5
54liuyao 已提交
3184 3185
    compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted);
    saveSessionOutputBuf(pAggSup, &winInfo);
5
54liuyao 已提交
3186 3187

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
5
54liuyao 已提交
3188
      code = saveResult(winInfo, pStUpdated);
5
54liuyao 已提交
3189
      if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
3190
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3191
      }
5
54liuyao 已提交
3192
    }
5
54liuyao 已提交
3193
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
3194 3195
      SSessionKey key = {0};
      getSessionHashKey(&winInfo.sessionWin, &key);
5
54liuyao 已提交
3196 3197 3198
      tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
    }

5
54liuyao 已提交
3199 3200 3201 3202
    i += winRows;
  }
}

5
54liuyao 已提交
3203
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
5
54liuyao 已提交
3204
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
3205
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
5
54liuyao 已提交
3206
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
3207
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
3208
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
3209
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
5
54liuyao 已提交
3210
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
5
54liuyao 已提交
3211 3212 3213 3214
    while (1) {
      SSessionKey curWin = {0};
      getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], &curWin);
      if (IS_INVALID_SESSION_WIN_KEY(curWin)) {
3215 3216
        break;
      }
5
54liuyao 已提交
3217 3218 3219 3220
      doDeleteSessionWindow(pAggSup, &curWin);
      if (result) {
        saveDeleteInfo(result, curWin);
      }
3221
    }
5
54liuyao 已提交
3222 3223 3224
  }
}

5
54liuyao 已提交
3225 3226 3227 3228 3229 3230 3231 3232
static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
  SSessionKey* pWin1 = (SSessionKey*)pKey1;
  SSessionKey* pWin2 = (SSessionKey*)pKey2;

  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
5
54liuyao 已提交
3233 3234
  }

5
54liuyao 已提交
3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255
  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
  void*   pIte = NULL;
  size_t  keyLen = 0;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
    void* key = tSimpleHashGetKey(pIte, &keyLen);
    taosArrayPush(pUpdated, key);
  }
  taosArraySort(pUpdated, sessionKeyCompareAsc);
  return TSDB_CODE_SUCCESS;
}

3256
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
5
54liuyao 已提交
3257 3258 3259 3260
  blockDataCleanup(pBlock);
  int32_t size = tSimpleHashGetSize(pStDeleted);
  if (size == 0) {
    return;
3261 3262
  }
  blockDataEnsureCapacity(pBlock, size);
5
54liuyao 已提交
3263 3264 3265 3266 3267 3268 3269
  size_t  keyLen = 0;
  int32_t iter = 0;
  while (((*Ite) = tSimpleHashIterate(pStDeleted, *Ite, &iter)) != NULL) {
    if (pBlock->info.rows + 1 > pBlock->info.capacity) {
      break;
    }
    SSessionKey*     res = tSimpleHashGetKey(*Ite, &keyLen);
3270
    SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
5
54liuyao 已提交
3271
    colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
3272
    SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
3273
    colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
3274 3275
    SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
    colDataAppendNULL(pUidCol, pBlock->info.rows);
5
54liuyao 已提交
3276 3277
    SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
    colDataAppend(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false);
3278 3279 3280 3281
    SColumnInfoData* pCalStCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
    colDataAppendNULL(pCalStCol, pBlock->info.rows);
    SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
    colDataAppendNULL(pCalEdCol, pBlock->info.rows);
3282 3283

    SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
3284 3285 3286

    void* tbname = NULL;
    streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname);
3287 3288 3289 3290 3291 3292
    if (tbname == NULL) {
      colDataAppendNULL(pTableCol, pBlock->info.rows);
    } else {
      char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
      STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
      colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
L
Liu Jicong 已提交
3293
      tdbFree(tbname);
3294
    }
5
54liuyao 已提交
3295 3296 3297
    pBlock->info.rows += 1;
  }
  if ((*Ite) == NULL) {
5
54liuyao 已提交
3298
    tSimpleHashClear(pStDeleted);
5
54liuyao 已提交
3299 3300 3301
  }
}

5
54liuyao 已提交
3302 3303 3304 3305 3306 3307 3308 3309
static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) {
  SExprSupp*                     pSup = &pOperator->exprSupp;
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
  int32_t                        size = taosArrayGetSize(pWinArray);
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
  int32_t                        numOfOutput = pSup->numOfExprs;
  int32_t                        numOfChildren = taosArrayGetSize(pInfo->pChildren);
3310

3311
  for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3312 3313 3314
    SSessionKey*      pWinKey = taosArrayGet(pWinArray, i);
    int32_t           num = 0;
    SResultWindowInfo parentWin = {0};
3315
    for (int32_t j = 0; j < numOfChildren; j++) {
X
Xiaoyu Wang 已提交
3316
      SOperatorInfo*                 pChild = taosArrayGetP(pInfo->pChildren, j);
3317
      SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
5
54liuyao 已提交
3318
      SStreamAggSupporter*           pChAggSup = &pChInfo->streamAggSup;
5
54liuyao 已提交
3319 3320
      SSessionKey                    chWinKey = {0};
      getSessionHashKey(pWinKey, &chWinKey);
3321 3322 3323
      SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
      SResultRow*      pResult = NULL;
      SResultRow*      pChResult = NULL;
5
54liuyao 已提交
3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335
      while (1) {
        SResultWindowInfo childWin = {0};
        childWin.sessionWin = *pWinKey;
        int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
        if (code == TSDB_CODE_SUCCESS && pWinKey->win.skey <= childWin.sessionWin.win.skey &&
            childWin.sessionWin.win.ekey <= pWinKey->win.ekey) {
          if (num == 0) {
            setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
            code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
            if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
              break;
            }
3336
          }
5
54liuyao 已提交
3337 3338 3339 3340 3341 3342 3343 3344
          num++;
          updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, true);
          initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
                               pChild->exprSupp.rowEntryInfoOffset);
          compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
          compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL);
          saveResult(parentWin, pStUpdated);
        } else {
5
54liuyao 已提交
3345
          break;
3346 3347
        }
      }
5
54liuyao 已提交
3348 3349 3350 3351
      streamStateFreeCur(pCur);
    }
    if (num > 0) {
      saveSessionOutputBuf(pAggSup, &parentWin);
3352 3353 3354 3355
    }
  }
}

5
54liuyao 已提交
3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) {
  void*   pIte = NULL;
  size_t  keyLen = 0;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
    SResultWindowInfo* pWinInfo = pIte;
    if (isCloseWindow(&pWinInfo->sessionWin.win, pTwSup)) {
      if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) {
        int32_t code = saveResult(*pWinInfo, pClosed);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
5
54liuyao 已提交
3367 3368
        }
      }
3369 3370
      SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
      tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
5
54liuyao 已提交
3371 3372 3373 3374 3375
    }
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3376
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs) {
5
54liuyao 已提交
3377 3378 3379 3380 3381
  int32_t size = taosArrayGetSize(pChildren);
  for (int32_t i = 0; i < size; i++) {
    SOperatorInfo*                 pChildOp = taosArrayGetP(pChildren, i);
    SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
    pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
3382
    closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL);
5
54liuyao 已提交
3383 3384 3385
  }
}

5
54liuyao 已提交
3386 3387 3388 3389
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
3390
    SResultWindowInfo* pWinInfo = pIte;
5
54liuyao 已提交
3391
    saveResult(*pWinInfo, pStUpdated);
5
54liuyao 已提交
3392 3393 3394 3395
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3396
static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
5
54liuyao 已提交
3397 3398
  int32_t size = taosArrayGetSize(pResWins);
  for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3399 3400
    SSessionKey* pWinKey = taosArrayGet(pResWins, i);
    if (!pWinKey) continue;
5
54liuyao 已提交
3401 3402
    SSessionKey winInfo = {0};
    getSessionHashKey(pWinKey, &winInfo);
5
54liuyao 已提交
3403
    tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0);
3404 3405 3406
  }
}

5
54liuyao 已提交
3407 3408 3409
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
  pGroupResInfo->pRows = pArrayList;
  pGroupResInfo->index = 0;
3410 3411
}

5
54liuyao 已提交
3412 3413 3414 3415 3416 3417 3418 3419 3420 3421
void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroupResInfo* pGroupResInfo,
                          SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    taosArrayDestroy(pGroupResInfo->pRows);
    pGroupResInfo->pRows = NULL;
3422 3423 3424
    return;
  }

5
54liuyao 已提交
3425
  // clear the existed group id
H
Haojun Liao 已提交
3426
  pBlock->info.id.groupId = 0;
3427
  buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
5
54liuyao 已提交
3428 3429
}

5
54liuyao 已提交
3430
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
5
54liuyao 已提交
3431
  SExprSupp*                     pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3432
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
3433
  SOptrBasicInfo*                pBInfo = &pInfo->binfo;
5
54liuyao 已提交
3434
  TSKEY                          maxTs = INT64_MIN;
5
54liuyao 已提交
3435
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3436 3437 3438
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  } else if (pOperator->status == OP_RES_TO_RETURN) {
3439
    doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3440
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3441
      printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
5
54liuyao 已提交
3442 3443
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3444 3445 3446 3447
    doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
    if (pBInfo->pRes->info.rows > 0) {
      printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
      return pBInfo->pRes;
5
54liuyao 已提交
3448
    }
5
54liuyao 已提交
3449

H
Haojun Liao 已提交
3450
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
3451
    return NULL;
5
54liuyao 已提交
3452 3453
  }

X
Xiaoyu Wang 已提交
3454
  _hash_fn_t     hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3455
  SSHashObj*     pStUpdated = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
3456
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3457
  SArray*        pUpdated = taosArrayInit(16, sizeof(SSessionKey));  // SResKeyPos
5
54liuyao 已提交
3458 3459 3460 3461 3462
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      break;
    }
5
54liuyao 已提交
3463
    printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
3464

5
54liuyao 已提交
3465 3466 3467
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3468
      // gap must be 0
5
54liuyao 已提交
3469 3470
      doDeleteTimeWindows(pAggSup, pBlock, pWins);
      removeSessionResults(pStUpdated, pWins);
5
54liuyao 已提交
3471 3472 3473 3474 3475
      if (IS_FINAL_OP(pInfo)) {
        int32_t                        childIndex = getChildIndex(pBlock);
        SOperatorInfo*                 pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
        SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
        // gap must be 0
5
54liuyao 已提交
3476 3477
        doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, NULL);
        rebuildSessionWindow(pOperator, pWins, pStUpdated);
5
54liuyao 已提交
3478 3479 3480 3481
      }
      copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
      taosArrayDestroy(pWins);
      continue;
3482
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3483
      getAllSessionWindow(pAggSup->pResultRows, pStUpdated);
5
54liuyao 已提交
3484
      continue;
5
54liuyao 已提交
3485
    }
5
54liuyao 已提交
3486

5
54liuyao 已提交
3487 3488 3489 3490
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
3491
    // the pDataBlock are always the same one, no need to call this again
3492
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3493 3494 3495 3496 3497 3498
    doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo));
    if (IS_FINAL_OP(pInfo)) {
      int32_t chIndex = getChildIndex(pBlock);
      int32_t size = taosArrayGetSize(pInfo->pChildren);
      // if chIndex + 1 - size > 0, add new child
      for (int32_t i = 0; i < chIndex + 1 - size; i++) {
3499 3500
        SOperatorInfo* pChildOp =
            createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
5
54liuyao 已提交
3501
        if (!pChildOp) {
S
Shengliang Guan 已提交
3502
          T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3503 3504 3505
        }
        taosArrayPush(pInfo->pChildren, &pChildOp);
      }
3506
      SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
3507
      setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3508
      doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
3509
    }
5
54liuyao 已提交
3510
    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
3511
    maxTs = TMAX(maxTs, pBlock->info.watermark);
5
54liuyao 已提交
3512
  }
5
54liuyao 已提交
3513 3514

  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
3515 3516
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
3517

5
54liuyao 已提交
3518 3519
  closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pStUpdated);
  closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
5
54liuyao 已提交
3520
  copyUpdateResult(pStUpdated, pUpdated);
5
54liuyao 已提交
3521 3522 3523
  removeSessionResults(pInfo->pStDeleted, pUpdated);
  tSimpleHashCleanup(pStUpdated);
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
5
54liuyao 已提交
3524
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
3525

3526 3527 3528 3529 3530 3531
#if 0
  char* pBuf = streamStateSessionDump(pAggSup->pState);
  qDebug("===stream===final session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

3532
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3533
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3534
    printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
5
54liuyao 已提交
3535 3536
    return pInfo->pDelRes;
  }
5
54liuyao 已提交
3537 3538 3539 3540 3541 3542 3543

  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
  if (pBInfo->pRes->info.rows > 0) {
    printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
    return pBInfo->pRes;
  }

H
Haojun Liao 已提交
3544
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
3545
  return NULL;
5
54liuyao 已提交
3546 3547
}

5
54liuyao 已提交
3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                  SExecTaskInfo* pTaskInfo) {
  SSessionWinodwPhysiNode*       pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
  int32_t                        numOfCols = 0;
  int32_t                        code = TSDB_CODE_OUT_OF_MEMORY;
  SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

  pOperator->pTaskInfo = pTaskInfo;

  initResultSizeInfo(&pOperator->resultInfo, 4096);
  if (pSessionNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
5
54liuyao 已提交
3568 3569
    }
  }
5
54liuyao 已提交
3570 3571 3572
  SExprSupp* pSup = &pOperator->exprSupp;

  SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
3573
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
5
54liuyao 已提交
3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608
  code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
                                pTaskInfo->streamInfo.pState, 0, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pSessionNode->window.watermark,
      .calTrigger = pSessionNode->window.triggerType,
      .maxTs = INT64_MIN,
      .minTs = INT64_MAX,
  };

  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

  pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
  if (pSessionNode->window.pTsEnd) {
    pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
  }
  pInfo->binfo.pRes = pResBlock;
  pInfo->order = TSDB_ORDER_ASC;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
  pInfo->pDelIterator = NULL;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
  pInfo->pChildren = NULL;
  pInfo->isFinal = false;
  pInfo->pPhyNode = pPhyNode;
  pInfo->ignoreExpiredData = pSessionNode->window.igExpired;

H
Haojun Liao 已提交
3609 3610
  setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
                  OP_NOT_OPENED, pInfo, pTaskInfo);
L
Liu Jicong 已提交
3611 3612
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
                                         optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
3613

5
54liuyao 已提交
3614
  if (downstream) {
5
54liuyao 已提交
3615
    initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
5
54liuyao 已提交
3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632
    code = appendDownstream(pOperator, &downstream, 1);
  }
  return pOperator;

_error:
  if (pInfo != NULL) {
    destroyStreamSessionAggOperatorInfo(pInfo);
  }

  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
  tSimpleHashClear(pInfo->streamAggSup.pResultRows);
  streamStateSessionClear(pInfo->streamAggSup.pState);
5
54liuyao 已提交
3633 3634 3635 3636 3637 3638 3639
}

static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SOptrBasicInfo*                pBInfo = &pInfo->binfo;
  TSKEY                          maxTs = INT64_MIN;
  SExprSupp*                     pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3640
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
3641

5
54liuyao 已提交
3642 3643
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
3644
  }
L
Liu Jicong 已提交
3645

3646
  {
5
54liuyao 已提交
3647
    doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
5
54liuyao 已提交
3648
    if (pBInfo->pRes->info.rows > 0) {
H
Haojun Liao 已提交
3649
      printDataBlock(pBInfo->pRes, "semi session");
5
54liuyao 已提交
3650 3651 3652
      return pBInfo->pRes;
    }

3653
    doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
3654
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3655
      printDataBlock(pInfo->pDelRes, "semi session delete");
5
54liuyao 已提交
3656 3657
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3658

3659
    if (pOperator->status == OP_RES_TO_RETURN) {
5
54liuyao 已提交
3660
      clearFunctionContext(&pOperator->exprSupp);
3661 3662
      // semi interval operator clear disk buffer
      clearStreamSessionOperator(pInfo);
H
Haojun Liao 已提交
3663
      setOperatorCompleted(pOperator);
3664 3665
      return NULL;
    }
5
54liuyao 已提交
3666 3667 3668
  }

  _hash_fn_t     hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3669
  SSHashObj*     pStUpdated = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
3670
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3671
  SArray*        pUpdated = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3672 3673 3674
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
3675
      clearSpecialDataBlock(pInfo->pUpdateRes);
3676
      pOperator->status = OP_RES_TO_RETURN;
5
54liuyao 已提交
3677 3678
      break;
    }
H
Haojun Liao 已提交
3679
    printDataBlock(pBlock, "semi session recv");
5
54liuyao 已提交
3680

5
54liuyao 已提交
3681 3682
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
5
54liuyao 已提交
3683
      // gap must be 0
3684
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3685
      doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
3686
      removeSessionResults(pStUpdated, pWins);
5
54liuyao 已提交
3687
      copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
3688
      taosArrayDestroy(pWins);
5
54liuyao 已提交
3689
      break;
5
54liuyao 已提交
3690
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3691
      getAllSessionWindow(pInfo->streamAggSup.pResultRows, pStUpdated);
5
54liuyao 已提交
3692 3693 3694
      continue;
    }

5
54liuyao 已提交
3695 3696 3697 3698
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
5
54liuyao 已提交
3699
    // the pDataBlock are always the same one, no need to call this again
3700
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
3701
    doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, NULL, false);
5
54liuyao 已提交
3702 3703 3704 3705
    maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
  }

  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
3706
  pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
3707

5
54liuyao 已提交
3708
  copyUpdateResult(pStUpdated, pUpdated);
5
54liuyao 已提交
3709 3710
  removeSessionResults(pInfo->pStDeleted, pUpdated);
  tSimpleHashCleanup(pStUpdated);
5
54liuyao 已提交
3711

5
54liuyao 已提交
3712
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
5
54liuyao 已提交
3713
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
3714

3715 3716 3717 3718 3719 3720
#if 0
  char* pBuf = streamStateSessionDump(pAggSup->pState);
  qDebug("===stream===semi session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

5
54liuyao 已提交
3721
  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
5
54liuyao 已提交
3722
  if (pBInfo->pRes->info.rows > 0) {
H
Haojun Liao 已提交
3723
    printDataBlock(pBInfo->pRes, "semi session");
5
54liuyao 已提交
3724 3725 3726
    return pBInfo->pRes;
  }

3727
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
3728
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3729
    printDataBlock(pInfo->pDelRes, "semi session delete");
5
54liuyao 已提交
3730 3731
    return pInfo->pDelRes;
  }
5
54liuyao 已提交
3732

5
54liuyao 已提交
3733 3734 3735
  clearFunctionContext(&pOperator->exprSupp);
  // semi interval operator clear disk buffer
  clearStreamSessionOperator(pInfo);
H
Haojun Liao 已提交
3736
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
3737
  return NULL;
5
54liuyao 已提交
3738
}
3739

3740 3741
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                       SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
3742 3743
  int32_t        code = TSDB_CODE_OUT_OF_MEMORY;
  SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo);
3744 3745 3746
  if (pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3747

3748
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
5
54liuyao 已提交
3749

H
Haojun Liao 已提交
3750
  pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
L
Liu Jicong 已提交
3751
  char* name = (pInfo->isFinal) ? "StreamSessionFinalAggOperator" : "StreamSessionSemiAggOperator";
H
Haojun Liao 已提交
3752 3753

  if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
H
Haojun Liao 已提交
3754
    pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
5
54liuyao 已提交
3755
    blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
3756 3757
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
                                           destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
5
54liuyao 已提交
3758
  }
3759

L
Liu Jicong 已提交
3760
  setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
H
Haojun Liao 已提交
3761

5
54liuyao 已提交
3762 3763 3764 3765
  pOperator->operatorType = pPhyNode->type;
  if (numOfChild > 0) {
    pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
    for (int32_t i = 0; i < numOfChild; i++) {
5
54liuyao 已提交
3766 3767
      SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
      if (pChildOp == NULL) {
5
54liuyao 已提交
3768 3769
        goto _error;
      }
5
54liuyao 已提交
3770 3771 3772 3773
      SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
      pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      streamStateSetNumber(pChInfo->streamAggSup.pState, i);
      taosArrayPush(pInfo->pChildren, &pChildOp);
3774 3775
    }
  }
3776 3777 3778 3779 3780

  if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
    pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
  }

3781 3782 3783 3784
  return pOperator;

_error:
  if (pInfo != NULL) {
3785
    destroyStreamSessionAggOperatorInfo(pInfo);
3786 3787 3788 3789 3790
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
5
54liuyao 已提交
3791

3792
void destroyStreamStateOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3793
  SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param;
3794
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
3795
  destroyStreamAggSupporter(&pInfo->streamAggSup);
5
54liuyao 已提交
3796 3797 3798 3799
  cleanupGroupResInfo(&pInfo->groupResInfo);
  if (pInfo->pChildren != NULL) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3800 3801
      SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
      destroyOperatorInfo(pChild);
5
54liuyao 已提交
3802
    }
5
54liuyao 已提交
3803
    taosArrayDestroy(pInfo->pChildren);
5
54liuyao 已提交
3804
  }
5
54liuyao 已提交
3805 3806
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  blockDataDestroy(pInfo->pDelRes);
5
54liuyao 已提交
3807
  tSimpleHashCleanup(pInfo->pSeDeleted);
D
dapan1121 已提交
3808
  taosMemoryFreeClear(param);
5
54liuyao 已提交
3809 3810 3811
}

bool isTsInWindow(SStateWindowInfo* pWin, TSKEY ts) {
5
54liuyao 已提交
3812
  if (pWin->winInfo.sessionWin.win.skey <= ts && ts <= pWin->winInfo.sessionWin.win.ekey) {
5
54liuyao 已提交
3813 3814 3815 3816 3817 3818
    return true;
  }
  return false;
}

bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
5
54liuyao 已提交
3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850
  return pKeyData && compareVal(pKeyData, pWin->pStateKey);
}

bool compareStateKey(void* data, void* key) {
  SStateKeys* stateKey = (SStateKeys*)key;
  stateKey->pData = (char*)key + sizeof(SStateKeys);
  return compareVal(data, stateKey);
}

void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
                       SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) {
  int32_t size = pAggSup->resultRowSize;
  pCurWin->winInfo.sessionWin.groupId = groupId;
  pCurWin->winInfo.sessionWin.win.skey = ts;
  pCurWin->winInfo.sessionWin.win.ekey = ts;
  int32_t code =
      streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize,
                                    compareStateKey, &pCurWin->winInfo.pOutputBuf, &size);
  pCurWin->pStateKey =
      (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
  pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
  pCurWin->pStateKey->type = pAggSup->stateKeyType;
  pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
  pCurWin->pStateKey->isNull = false;

  if (code == TSDB_CODE_SUCCESS) {
    pCurWin->winInfo.isOutput = true;
  } else {
    if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
      varDataCopy(pCurWin->pStateKey->pData, pKeyData);
    } else {
      memcpy(pCurWin->pStateKey->pData, pKeyData, pCurWin->pStateKey->bytes);
5
54liuyao 已提交
3851 3852 3853
    }
  }

5
54liuyao 已提交
3854 3855 3856 3857 3858 3859
  pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
  pNextWin->winInfo.pOutputBuf = NULL;
  SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
  code = streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_INVALID(pNextWin->winInfo);
5
54liuyao 已提交
3860
  }
5
54liuyao 已提交
3861
  streamStateFreeCur(pCur);
5
54liuyao 已提交
3862 3863
}

5
54liuyao 已提交
3864
int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId,
H
Haojun Liao 已提交
3865
                              SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual,
5
54liuyao 已提交
3866
                              SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) {
5
54liuyao 已提交
3867 3868 3869 3870
  *allEqual = true;
  for (int32_t i = start; i < rows; ++i) {
    char* pKeyData = colDataGetData(pKeyCol, i);
    if (!isTsInWindow(pWinInfo, pTs[i])) {
X
Xiaoyu Wang 已提交
3871
      if (isEqualStateKey(pWinInfo, pKeyData)) {
5
54liuyao 已提交
3872
        if (IS_VALID_SESSION_WIN(pNextWin->winInfo)) {
5
54liuyao 已提交
3873
          // ts belongs to the next window
5
54liuyao 已提交
3874
          if (pTs[i] >= pNextWin->winInfo.sessionWin.win.skey) {
5
54liuyao 已提交
3875 3876 3877 3878 3879 3880 3881
            return i - start;
          }
        }
      } else {
        return i - start;
      }
    }
5
54liuyao 已提交
3882 3883

    if (pWinInfo->winInfo.sessionWin.win.skey > pTs[i]) {
H
Haojun Liao 已提交
3884
      if (pSeDeleted && pWinInfo->winInfo.isOutput) {
5
54liuyao 已提交
3885
        saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
5
54liuyao 已提交
3886
      }
5
54liuyao 已提交
3887 3888
      removeSessionResult(pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin);
      pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
5
54liuyao 已提交
3889
    }
5
54liuyao 已提交
3890
    pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]);
5
54liuyao 已提交
3891 3892 3893 3894 3895 3896 3897
    if (!isEqualStateKey(pWinInfo, pKeyData)) {
      *allEqual = false;
    }
  }
  return rows - start;
}

5
54liuyao 已提交
3898 3899
static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
                                 SSHashObj* pStDeleted) {
X
Xiaoyu Wang 已提交
3900
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3901
  SStreamStateAggOperatorInfo* pInfo = pOperator->info;
3902
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
3903
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
X
Xiaoyu Wang 已提交
3904 3905 3906 3907
  int64_t                      code = TSDB_CODE_SUCCESS;
  TSKEY*                       tsCols = NULL;
  SResultRow*                  pResult = NULL;
  int32_t                      winRows = 0;
5
54liuyao 已提交
3908
  if (pSDataBlock->pDataBlock != NULL) {
X
Xiaoyu Wang 已提交
3909 3910
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
    tsCols = (int64_t*)pColDataInfo->pData;
5
54liuyao 已提交
3911
  } else {
X
Xiaoyu Wang 已提交
3912
    return;
5
54liuyao 已提交
3913
  }
L
Liu Jicong 已提交
3914

5
54liuyao 已提交
3915
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3916 3917
  int32_t              rows = pSDataBlock->info.rows;
  blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
L
Liu Jicong 已提交
3918
  SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
5
54liuyao 已提交
3919
  for (int32_t i = 0; i < rows; i += winRows) {
5
54liuyao 已提交
3920
    if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
5
54liuyao 已提交
3921 3922 3923
      i++;
      continue;
    }
5
54liuyao 已提交
3924 3925 3926 3927 3928 3929 3930 3931 3932
    char*            pKeyData = colDataGetData(pKeyColInfo, i);
    int32_t          winIndex = 0;
    bool             allEqual = true;
    SStateWindowInfo curWin = {0};
    SStateWindowInfo nextWin = {0};
    setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin);
    setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
    winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
                                    pAggSup->pResultRows, pSeUpdated, pStDeleted);
5
54liuyao 已提交
3933
    if (!allEqual) {
3934
      uint64_t uid = 0;
5
54liuyao 已提交
3935 3936 3937 3938 3939
      appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
                                       &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
      tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
      doDeleteSessionWindow(pAggSup, &curWin.winInfo.sessionWin);
      releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curWin.winInfo.pOutputBuf);
5
54liuyao 已提交
3940 3941
      continue;
    }
5
54liuyao 已提交
3942 3943
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
                              pOperator);
5
54liuyao 已提交
3944
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
3945
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3946
    }
5
54liuyao 已提交
3947 3948
    saveSessionOutputBuf(pAggSup, &curWin.winInfo);

5
54liuyao 已提交
3949
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
5
54liuyao 已提交
3950
      code = saveResult(curWin.winInfo, pSeUpdated);
5
54liuyao 已提交
3951
      if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
3952
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3953 3954
      }
    }
3955 3956

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
3957 3958
      SSessionKey key = {0};
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
3959 3960
      tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
    }
5
54liuyao 已提交
3961 3962 3963 3964 3965 3966 3967 3968
  }
}

static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

3969
  SExprSupp*                   pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3970
  SStreamStateAggOperatorInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
3971
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
L
Liu Jicong 已提交
3972
  int64_t                      maxTs = INT64_MIN;
5
54liuyao 已提交
3973
  if (pOperator->status == OP_RES_TO_RETURN) {
3974
    doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3975
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3976
      printDataBlock(pInfo->pDelRes, "single state delete");
5
54liuyao 已提交
3977 3978
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3979 3980 3981 3982 3983

    doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
    if (pBInfo->pRes->info.rows > 0) {
      printDataBlock(pBInfo->pRes, "single state");
      return pBInfo->pRes;
5
54liuyao 已提交
3984
    }
5
54liuyao 已提交
3985

H
Haojun Liao 已提交
3986
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
3987
    return NULL;
5
54liuyao 已提交
3988 3989
  }

X
Xiaoyu Wang 已提交
3990
  _hash_fn_t     hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3991
  SSHashObj*     pSeUpdated = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
3992
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3993
  SArray*        pUpdated = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3994 3995 3996 3997 3998
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      break;
    }
5
54liuyao 已提交
3999
    printDataBlock(pBlock, "single state recv");
4000

5
54liuyao 已提交
4001 4002 4003 4004
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
      doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
4005
      removeSessionResults(pSeUpdated, pWins);
5
54liuyao 已提交
4006
      copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
4007 4008
      taosArrayDestroy(pWins);
      continue;
4009
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
4010
      getAllSessionWindow(pInfo->streamAggSup.pResultRows, pSeUpdated);
5
54liuyao 已提交
4011
      continue;
5
54liuyao 已提交
4012
    }
4013

5
54liuyao 已提交
4014 4015 4016 4017
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
4018
    // the pDataBlock are always the same one, no need to call this again
4019
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
4020
    doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
4021
    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
5
54liuyao 已提交
4022
  }
4023
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
4024 4025
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;
X
Xiaoyu Wang 已提交
4026

5
54liuyao 已提交
4027
  closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pSeUpdated);
5
54liuyao 已提交
4028
  copyUpdateResult(pSeUpdated, pUpdated);
5
54liuyao 已提交
4029 4030
  removeSessionResults(pInfo->pSeDeleted, pUpdated);
  tSimpleHashCleanup(pSeUpdated);
5
54liuyao 已提交
4031

5
54liuyao 已提交
4032
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
5
54liuyao 已提交
4033
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
4034

5
54liuyao 已提交
4035 4036 4037 4038 4039 4040
#if 0
  char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState);
  qDebug("===stream===final session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

4041
  doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
4042
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4043
    printDataBlock(pInfo->pDelRes, "single state delete");
5
54liuyao 已提交
4044 4045 4046
    return pInfo->pDelRes;
  }

5
54liuyao 已提交
4047 4048 4049 4050 4051
  doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
  if (pBInfo->pRes->info.rows > 0) {
    printDataBlock(pBInfo->pRes, "single state");
    return pBInfo->pRes;
  }
H
Haojun Liao 已提交
4052
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
4053
  return NULL;
4054 4055
}

X
Xiaoyu Wang 已提交
4056 4057 4058 4059 4060
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo) {
  SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
  int32_t                      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
  SColumnNode*                 pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
H
Haojun Liao 已提交
4061
  int32_t                      code = TSDB_CODE_SUCCESS;
5
54liuyao 已提交
4062

X
Xiaoyu Wang 已提交
4063 4064
  SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo));
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5
54liuyao 已提交
4065
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
4066
    code = TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
4067 4068 4069 4070
    goto _error;
  }

  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
4071
  initResultSizeInfo(&pOperator->resultInfo, 4096);
5
54liuyao 已提交
4072 4073 4074
  if (pStateNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
H
Haojun Liao 已提交
4075
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
5
54liuyao 已提交
4076 4077 4078 4079 4080
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

X
Xiaoyu Wang 已提交
4081 4082
  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pStateNode->window.watermark,
5
54liuyao 已提交
4083 4084
      .calTrigger = pStateNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
4085
      .minTs = INT64_MAX,
X
Xiaoyu Wang 已提交
4086
  };
4087

5
54liuyao 已提交
4088
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
4089

5
54liuyao 已提交
4090 4091 4092
  SExprSupp*   pSup = &pOperator->exprSupp;
  int32_t      numOfCols = 0;
  SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
4093
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
4094
  code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
5
54liuyao 已提交
4095 4096 4097
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
5
54liuyao 已提交
4098 4099 4100 4101
  int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
  int16_t type = pColNode->node.resType.type;
  code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
                                type);
5
54liuyao 已提交
4102 4103 4104 4105 4106 4107
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  pInfo->primaryTsIndex = tsSlotId;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
4108
  pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
4109
  pInfo->pDelIterator = NULL;
H
Haojun Liao 已提交
4110
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
5
54liuyao 已提交
4111
  pInfo->pChildren = NULL;
5
54liuyao 已提交
4112
  pInfo->ignoreExpiredData = pStateNode->window.igExpired;
5
54liuyao 已提交
4113

L
Liu Jicong 已提交
4114 4115
  setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
4116 4117
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
                                         optrDefaultBufFn, NULL);
5
54liuyao 已提交
4118
  initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
5
54liuyao 已提交
4119 4120 4121 4122 4123 4124 4125
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
  return pOperator;

_error:
4126
  destroyStreamStateOperatorInfo(pInfo);
5
54liuyao 已提交
4127 4128 4129 4130
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
4131

4132
void destroyMAIOperatorInfo(void* param) {
4133
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
4134
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
D
dapan1121 已提交
4135
  taosMemoryFreeClear(param);
4136 4137
}

4138
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
H
Haojun Liao 已提交
4139
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
4140 4141 4142
  if (NULL == pResult) {
    return pResult;
  }
H
Haojun Liao 已提交
4143
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
4144 4145
  return pResult;
}
4146

4147 4148 4149 4150 4151 4152 4153 4154
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
  if (*pResult == NULL) {
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
    if (*pResult == NULL) {
      return terrno;
    }
  }
4155

4156
  // set time window for current result
4157 4158
  (*pResult)->win = (*win);
  setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
4159
  return TSDB_CODE_SUCCESS;
4160 4161
}

4162
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
4163
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
4164
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
D
dapan1121 已提交
4165
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
4166 4167

  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
4168
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
4169
  SInterval*     pInterval = &iaInfo->interval;
4170

5
54liuyao 已提交
4171 4172
  int32_t  startPos = 0;
  int64_t* tsCols = extractTsCol(pBlock, iaInfo);
4173

4174 4175
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);

4176 4177
  // there is an result exists
  if (miaInfo->curTs != INT64_MIN) {
4178
    if (ts != miaInfo->curTs) {
4179
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
4180
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
4181
      miaInfo->curTs = ts;
4182
    }
4183 4184
  } else {
    miaInfo->curTs = ts;
4185 4186 4187
  }

  STimeWindow win = {0};
4188
  win.skey = miaInfo->curTs;
4189
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
4190

5
54liuyao 已提交
4191
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4192 4193
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
    T_LONG_JMP(pTaskInfo->env, ret);
4194 4195
  }

4196 4197
  int32_t currPos = startPos;

4198
  STimeWindow currWin = win;
4199
  while (++currPos < pBlock->info.rows) {
4200
    if (tsCols[currPos] == miaInfo->curTs) {
4201
      continue;
4202 4203 4204
    }

    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
L
Liu Jicong 已提交
4205 4206
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
                                    currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
4207

4208
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
4209
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
4210
    miaInfo->curTs = tsCols[currPos];
4211

4212
    currWin.skey = miaInfo->curTs;
4213
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
4214 4215

    startPos = currPos;
5
54liuyao 已提交
4216
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4217 4218
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
      T_LONG_JMP(pTaskInfo->env, ret);
4219
    }
4220 4221

    miaInfo->curTs = currWin.skey;
4222
  }
4223

4224
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
H
Haojun Liao 已提交
4225
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
L
Liu Jicong 已提交
4226
                                  pBlock->info.rows, pSup->numOfExprs);
4227 4228
}

4229
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
H
Haojun Liao 已提交
4230
  pRes->info.id.groupId = pMiaInfo->groupId;
4231 4232
  pMiaInfo->curTs = INT64_MIN;
  pMiaInfo->groupId = 0;
4233 4234
}

4235
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
S
shenglian zhou 已提交
4236
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4237

4238 4239
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
4240

4241 4242 4243 4244 4245
  SExprSupp*      pSup = &pOperator->exprSupp;
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
  int32_t         scanFlag = MAIN_SCAN;
4246

4247 4248
  while (1) {
    SSDataBlock* pBlock = NULL;
4249
    if (pMiaInfo->prefetchedBlock == NULL) {
4250 4251
      pBlock = downstream->fpSet.getNextFn(downstream);
    } else {
4252 4253
      pBlock = pMiaInfo->prefetchedBlock;
      pMiaInfo->prefetchedBlock = NULL;
4254

H
Haojun Liao 已提交
4255
      pMiaInfo->groupId = pBlock->info.id.groupId;
4256
    }
4257

4258
    // no data exists, all query processing is done
4259
    if (pBlock == NULL) {
4260 4261 4262
      // close last unclosed time window
      if (pMiaInfo->curTs != INT64_MIN) {
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
4263 4264
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
4265
      }
4266

H
Haojun Liao 已提交
4267
      setOperatorCompleted(pOperator);
4268
      break;
4269
    }
4270

H
Haojun Liao 已提交
4271
    if (pMiaInfo->groupId == 0) {
H
Haojun Liao 已提交
4272 4273 4274
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
        pMiaInfo->groupId = pBlock->info.id.groupId;
        pRes->info.id.groupId = pMiaInfo->groupId;
H
Haojun Liao 已提交
4275 4276
      }
    } else {
H
Haojun Liao 已提交
4277
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
H
Haojun Liao 已提交
4278
        // if there are unclosed time window, close it firstly.
4279
        ASSERT(pMiaInfo->curTs != INT64_MIN);
H
Haojun Liao 已提交
4280
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
4281
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
H
Haojun Liao 已提交
4282

4283 4284
        pMiaInfo->prefetchedBlock = pBlock;
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
H
Haojun Liao 已提交
4285
        break;
5
54liuyao 已提交
4286
      } else {
H
Haojun Liao 已提交
4287
        // continue
H
Haojun Liao 已提交
4288
        pRes->info.id.groupId = pMiaInfo->groupId;
H
Haojun Liao 已提交
4289
      }
4290
    }
4291

4292
    getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag);
4293
    setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
4294
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
4295

H
Haojun Liao 已提交
4296
    doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
4297 4298 4299
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
      break;
    }
4300
  }
4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315
}

static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pRes = iaInfo->binfo.pRes;
  blockDataCleanup(pRes);

  if (iaInfo->binfo.mergeResultBlock) {
dengyihao's avatar
dengyihao 已提交
4316
    while (1) {
4317
      if (pOperator->status == OP_EXEC_DONE) {
4318 4319
        break;
      }
4320

4321
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
4322 4323 4324
        break;
      }

4325 4326 4327 4328
      doMergeAlignedIntervalAgg(pOperator);
    }
  } else {
    doMergeAlignedIntervalAgg(pOperator);
4329 4330 4331 4332 4333 4334 4335
  }

  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;
  return (rows == 0) ? NULL : pRes;
}

4336
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
4337
                                                      SExecTaskInfo* pTaskInfo) {
4338
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
4339
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4340 4341 4342 4343
  if (miaInfo == NULL || pOperator == NULL) {
    goto _error;
  }

D
dapan1121 已提交
4344 4345 4346 4347 4348
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
  if (miaInfo->intervalAggOperatorInfo == NULL) {
    goto _error;
  }

4349 4350 4351 4352 4353 4354 4355
  SInterval interval = {.interval = pNode->interval,
                        .sliding = pNode->sliding,
                        .intervalUnit = pNode->intervalUnit,
                        .slidingUnit = pNode->slidingUnit,
                        .offset = pNode->offset,
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision};

D
dapan1121 已提交
4356
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
4357
  SExprSupp*                pSup = &pOperator->exprSupp;
4358

H
Haojun Liao 已提交
4359 4360 4361 4362 4363
  int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

L
Liu Jicong 已提交
4364 4365 4366 4367
  miaInfo->curTs = INT64_MIN;
  iaInfo->win = pTaskInfo->window;
  iaInfo->inputOrder = TSDB_ORDER_ASC;
  iaInfo->interval = interval;
4368 4369
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
4370 4371

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
H
Haojun Liao 已提交
4372
  initResultSizeInfo(&pOperator->resultInfo, 512);
4373

H
Haojun Liao 已提交
4374 4375
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
H
Haojun Liao 已提交
4376

L
Liu Jicong 已提交
4377 4378
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
4379 4380 4381
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4382

H
Haojun Liao 已提交
4383
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
4384
  initBasicInfo(&iaInfo->binfo, pResBlock);
4385
  initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
4386

4387
  iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo);
4388
  if (iaInfo->timeWindowInterpo) {
4389
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
4390 4391
  }

4392
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
4393
  blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
L
Liu Jicong 已提交
4394 4395
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
4396

L
Liu Jicong 已提交
4397 4398
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo,
                                         optrDefaultBufFn, NULL);
4399 4400 4401 4402 4403 4404 4405 4406 4407

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
4408
  destroyMAIOperatorInfo(miaInfo);
4409 4410 4411 4412
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
4413 4414 4415 4416 4417

//=====================================================================================================================
// merge interval operator
typedef struct SMergeIntervalAggOperatorInfo {
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
L
Liu Jicong 已提交
4418 4419 4420 4421 4422 4423
  SList*                   groupIntervals;
  SListIter                groupIntervalsIter;
  bool                     hasGroupId;
  uint64_t                 groupId;
  SSDataBlock*             prefetchedBlock;
  bool                     inputBlocksFinished;
4424 4425
} SMergeIntervalAggOperatorInfo;

S
slzhou 已提交
4426
typedef struct SGroupTimeWindow {
L
Liu Jicong 已提交
4427
  uint64_t    groupId;
S
slzhou 已提交
4428 4429 4430
  STimeWindow window;
} SGroupTimeWindow;

4431
void destroyMergeIntervalOperatorInfo(void* param) {
4432
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
S
slzhou 已提交
4433
  tdListFree(miaInfo->groupIntervals);
4434
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
4435

D
dapan1121 已提交
4436
  taosMemoryFreeClear(param);
4437 4438
}

L
Liu Jicong 已提交
4439 4440
static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win,
                                    SSDataBlock* pResultBlock) {
4441 4442 4443
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
  SExecTaskInfo*                 pTaskInfo = pOperatorInfo->pTaskInfo;
4444
  bool                           ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4445 4446 4447
  SExprSupp*                     pExprSup = &pOperatorInfo->exprSupp;

  SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
L
Liu Jicong 已提交
4448 4449
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
      iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
4450
  ASSERT(p1 != NULL);
5
54liuyao 已提交
4451
  //  finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
4452
  tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
4453 4454 4455
  return TSDB_CODE_SUCCESS;
}

4456 4457 4458 4459
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
                                        STimeWindow* newWin) {
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
4460
  bool                           ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4461

S
slzhou 已提交
4462 4463
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
  tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
4464

S
slzhou 已提交
4465 4466 4467 4468 4469
  SListIter iter = {0};
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
  SListNode* listNode = NULL;
  while ((listNode = tdListNext(&iter)) != NULL) {
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
L
Liu Jicong 已提交
4470
    if (prevGrpWin->groupId != tableGroupId) {
S
slzhou 已提交
4471 4472
      continue;
    }
4473

S
slzhou 已提交
4474
    STimeWindow* prevWin = &prevGrpWin->window;
H
Haojun Liao 已提交
4475
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
5
54liuyao 已提交
4476
      //      finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
S
slzhou 已提交
4477 4478
      tdListPopNode(miaInfo->groupIntervals, listNode);
    }
4479 4480 4481 4482 4483 4484 4485 4486 4487 4488 4489 4490 4491 4492 4493 4494
  }

  return 0;
}

static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;

  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;

  int32_t     startPos = 0;
  int32_t     numOfOutput = pExprSup->numOfExprs;
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo);
H
Haojun Liao 已提交
4495
  uint64_t    tableGroupId = pBlock->info.id.groupId;
4496
  bool        ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4497 4498 4499
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
  SResultRow* pResult = NULL;

4500 4501
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
                                        iaInfo->inputOrder);
4502 4503 4504 4505 4506

  int32_t ret =
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
4507
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4508 4509 4510 4511
  }

  TSKEY   ekey = ascScan ? win.ekey : win.skey;
  int32_t forwardRows =
4512
      getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
4513
  ASSERT(forwardRows > 0);
4514 4515 4516

  // prev time window not interpolation yet.
  if (iaInfo->timeWindowInterpo) {
4517
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
4518 4519 4520 4521 4522 4523
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);

    // restore current time window
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
4524
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4525 4526 4527 4528 4529 4530 4531
    }

    // window start key interpolation
    doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
  }

  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
H
Haojun Liao 已提交
4532
  applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
L
Liu Jicong 已提交
4533
                                  pBlock->info.rows, numOfOutput);
4534 4535 4536 4537 4538 4539 4540 4541
  doCloseWindow(pResultRowInfo, iaInfo, pResult);

  // output previous interval results after this interval (&win) is closed
  outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);

  STimeWindow nextWin = win;
  while (1) {
    int32_t prevEndPos = forwardRows - 1 + startPos;
4542 4543
    startPos =
        getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
4544 4545 4546 4547 4548 4549 4550 4551 4552
    if (startPos < 0) {
      break;
    }

    // null data, failed to allocate more memory buffer
    int32_t code =
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
4553
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4554 4555 4556 4557
    }

    ekey = ascScan ? nextWin.ekey : nextWin.skey;
    forwardRows =
4558
        getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
4559 4560 4561 4562 4563

    // window start(end) key interpolation
    doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);

    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
H
Haojun Liao 已提交
4564
    applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
L
Liu Jicong 已提交
4565
                                    pBlock->info.rows, numOfOutput);
4566 4567 4568 4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579 4580 4581 4582 4583 4584 4585 4586 4587 4588 4589 4590 4591 4592 4593 4594 4595 4596 4597 4598 4599 4600
    doCloseWindow(pResultRowInfo, iaInfo, pResult);

    // output previous interval results after this interval (&nextWin) is closed
    outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
  }

  if (iaInfo->timeWindowInterpo) {
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
  }
}

static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pRes = iaInfo->binfo.pRes;
  blockDataCleanup(pRes);
  blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);

  if (!miaInfo->inputBlocksFinished) {
    SOperatorInfo* downstream = pOperator->pDownstream[0];
    int32_t        scanFlag = MAIN_SCAN;
    while (1) {
      SSDataBlock* pBlock = NULL;
      if (miaInfo->prefetchedBlock == NULL) {
        pBlock = downstream->fpSet.getNextFn(downstream);
      } else {
        pBlock = miaInfo->prefetchedBlock;
H
Haojun Liao 已提交
4601
        miaInfo->groupId = pBlock->info.id.groupId;
4602 4603 4604 4605
        miaInfo->prefetchedBlock = NULL;
      }

      if (pBlock == NULL) {
S
slzhou 已提交
4606
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
4607 4608 4609 4610 4611 4612
        miaInfo->inputBlocksFinished = true;
        break;
      }

      if (!miaInfo->hasGroupId) {
        miaInfo->hasGroupId = true;
H
Haojun Liao 已提交
4613 4614
        miaInfo->groupId = pBlock->info.id.groupId;
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
4615 4616 4617 4618
        miaInfo->prefetchedBlock = pBlock;
        break;
      }

4619
      getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag);
4620
      setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true);
4621 4622 4623 4624 4625 4626 4627
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);

      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
        break;
      }
    }

H
Haojun Liao 已提交
4628
    pRes->info.id.groupId = miaInfo->groupId;
4629 4630 4631
  }

  if (miaInfo->inputBlocksFinished) {
S
slzhou 已提交
4632
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
4633

S
slzhou 已提交
4634 4635
    if (listNode != NULL) {
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
5
54liuyao 已提交
4636
      //      finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
H
Haojun Liao 已提交
4637
      pRes->info.id.groupId = grpWin->groupId;
4638 4639 4640 4641
    }
  }

  if (pRes->info.rows == 0) {
H
Haojun Liao 已提交
4642
    setOperatorCompleted(pOperator);
4643 4644 4645 4646 4647 4648 4649
  }

  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;
  return (rows == 0) ? NULL : pRes;
}

4650 4651 4652
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
                                               SExecTaskInfo* pTaskInfo) {
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
4653
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4654
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
4655 4656 4657
    goto _error;
  }

5
54liuyao 已提交
4658 4659
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4660 4661 4662 4663 4664 4665 4666

  SInterval interval = {.interval = pIntervalPhyNode->interval,
                        .sliding = pIntervalPhyNode->sliding,
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
                        .offset = pIntervalPhyNode->offset,
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4667

4668
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
4669

4670
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
L
Liu Jicong 已提交
4671
  pIntervalInfo->win = pTaskInfo->window;
4672
  pIntervalInfo->inputOrder = TSDB_ORDER_ASC;
L
Liu Jicong 已提交
4673
  pIntervalInfo->interval = interval;
4674 4675
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
4676 4677 4678 4679

  SExprSupp* pExprSupp = &pOperator->exprSupp;

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
4680
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4681

L
Liu Jicong 已提交
4682 4683
  int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
                            pTaskInfo->streamInfo.pState);
4684 4685 4686
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4687

H
Haojun Liao 已提交
4688
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
4689 4690
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
  initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
4691

4692 4693
  pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
  if (pIntervalInfo->timeWindowInterpo) {
4694
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
4695
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
4696 4697 4698 4699
      goto _error;
    }
  }

4700
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
4701 4702
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
L
Liu Jicong 已提交
4703 4704
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo,
                                         optrDefaultBufFn, NULL);
4705 4706 4707 4708 4709 4710 4711 4712 4713

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
H
Haojun Liao 已提交
4714 4715 4716 4717
  if (pMergeIntervalInfo != NULL) {
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
  }

4718 4719 4720
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
4721
}
4722 4723 4724

static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
4725 4726
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  int64_t                      maxTs = INT64_MIN;
5
54liuyao 已提交
4727
  int64_t                      minTs = INT64_MAX;
4728
  SExprSupp*                   pSup = &pOperator->exprSupp;
4729 4730 4731 4732 4733 4734

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  if (pOperator->status == OP_RES_TO_RETURN) {
4735
    doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
4736
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4737
      printDataBlock(pInfo->pDelRes, "single interval delete");
4738 4739 4740
      return pInfo->pDelRes;
    }

4741
    doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
4742 4743 4744
    if (pInfo->binfo.pRes->info.rows > 0) {
      printDataBlock(pInfo->binfo.pRes, "single interval");
      return pInfo->binfo.pRes;
4745
    }
4746 4747
    deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
                          &pInfo->delKey);
H
Haojun Liao 已提交
4748
    setOperatorCompleted(pOperator);
L
Liu Jicong 已提交
4749
    streamStateCommit(pTaskInfo->streamInfo.pState);
5
54liuyao 已提交
4750
    return NULL;
4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

  SArray*    pUpdated = taosArrayInit(4, POINTER_BYTES);  // SResKeyPos
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  SHashObj*  pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);

  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
4762 4763
      qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
      pInfo->numOfDatapack = 0;
4764 4765
      break;
    }
5
54liuyao 已提交
4766
    pInfo->numOfDatapack++;
4767 4768
    printDataBlock(pBlock, "single interval recv");

4769 4770 4771
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789 4790
      continue;
    } else if (pBlock->info.type == STREAM_GET_ALL) {
      getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
      continue;
    }

    if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) {
      // set input version
      pTaskInfo->version = pBlock->info.version;
    }

    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }

    // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
    // caller. Note that all the time window are not close till now.
    // the pDataBlock are always the same one, no need to call this again
4791
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
4792 4793 4794 4795 4796
    if (pInfo->invertible) {
      setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
    }

    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
5
54liuyao 已提交
4797
    minTs = TMIN(minTs, pBlock->info.window.skey);
H
Haojun Liao 已提交
4798

H
Haojun Liao 已提交
4799
    doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap);
4800 4801
  }
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
4802
  pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
4803
  pOperator->status = OP_RES_TO_RETURN;
4804
  removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
5
54liuyao 已提交
4805
  closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
4806
                            pInfo->pDelWins, pOperator);
4807 4808 4809 4810 4811 4812 4813 4814 4815 4816

  void* pIte = NULL;
  while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
    taosArrayPush(pUpdated, pIte);
  }
  taosArraySort(pUpdated, resultrowComparAsc);

  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
  taosHashCleanup(pUpdatedMap);
5
54liuyao 已提交
4817

4818
  doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
4819
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4820
    printDataBlock(pInfo->pDelRes, "single interval delete");
4821 4822 4823
    return pInfo->pDelRes;
  }

4824
  doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
4825 4826 4827 4828 4829 4830
  if (pInfo->binfo.pRes->info.rows > 0) {
    printDataBlock(pInfo->binfo.pRes, "single interval");
    return pInfo->binfo.pRes;
  }

  return NULL;
4831 4832 4833 4834 4835
}

SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo) {
  SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
4836
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4837 4838 4839 4840 4841
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
  SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;

H
Haojun Liao 已提交
4842
  int32_t    code = TSDB_CODE_SUCCESS;
4843 4844
  int32_t    numOfCols = 0;
  SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
4845

H
Haojun Liao 已提交
4846
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
4847 4848 4849 4850 4851 4852 4853 4854
  SInterval    interval = {
         .interval = pIntervalPhyNode->interval,
         .sliding = pIntervalPhyNode->sliding,
         .intervalUnit = pIntervalPhyNode->intervalUnit,
         .slidingUnit = pIntervalPhyNode->slidingUnit,
         .offset = pIntervalPhyNode->offset,
         .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
  };
H
Haojun Liao 已提交
4855

4856 4857 4858 4859
  STimeWindowAggSupp twAggSupp = {
      .waterMark = pIntervalPhyNode->window.watermark,
      .calTrigger = pIntervalPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
4860
      .minTs = INT64_MAX,
5
54liuyao 已提交
4861
      .deleteMark = getDeleteMark(pIntervalPhyNode),
4862
  };
H
Haojun Liao 已提交
4863

5
54liuyao 已提交
4864
  ASSERTS(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
4865

4866 4867 4868 4869 4870 4871 4872
  pOperator->pTaskInfo = pTaskInfo;
  pInfo->interval = interval;
  pInfo->twAggSup = twAggSupp;
  pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
  pInfo->isFinal = false;

  SExprSupp* pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
4873 4874 4875 4876
  initBasicInfo(&pInfo->binfo, pResBlock);
  initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

4877
  pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
4878
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4879

4880
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
L
Liu Jicong 已提交
4881 4882
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
4883 4884 4885 4886
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
4887 4888 4889 4890 4891 4892 4893 4894
  if (pIntervalPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }
4895 4896

  pInfo->invertible = allInvertible(pSup->pCtx, numOfCols);
4897
  pInfo->invertible = false;
4898 4899 4900 4901 4902
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
  pInfo->delIndex = 0;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
  initResultRowInfo(&pInfo->binfo.resultRowInfo);

4903 4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
  streamStateSetNumber(pInfo->pState, -1);

  pInfo->pPhyNode = NULL;  // create new child
  pInfo->pPullDataMap = NULL;
  pInfo->pPullWins = NULL;  // SPullWindowInfo
  pInfo->pullIndex = 0;
  pInfo->pPullDataRes = NULL;
  pInfo->isFinal = false;
  pInfo->pChildren = NULL;
  pInfo->delKey.ts = INT64_MAX;
  pInfo->delKey.groupId = 0;
5
54liuyao 已提交
4916
  pInfo->numOfDatapack = 0;
4917

L
Liu Jicong 已提交
4918 4919
  setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
4920 4921
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
                                         destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
4922

5
54liuyao 已提交
4923
  initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
4924 4925 4926 4927 4928 4929 4930 4931
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
4932
  destroyStreamFinalIntervalOperatorInfo(pInfo);
4933 4934 4935 4936
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
H
Haojun Liao 已提交
4937