timewindowoperator.c 186.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
#include "executorimpl.h"
16
#include "filter.h"
X
Xiaoyu Wang 已提交
17
#include "function.h"
5
54liuyao 已提交
18
#include "functionMgt.h"
H
Haojun Liao 已提交
19
#include "tcommon.h"
5
54liuyao 已提交
20
#include "tcompare.h"
L
Liu Jicong 已提交
21
#include "tdatablock.h"
H
Haojun Liao 已提交
22
#include "tfill.h"
23
#include "ttime.h"
24

H
Haojun Liao 已提交
25
#define IS_FINAL_OP(op) ((op)->isFinal)
5
54liuyao 已提交
26
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
H
Haojun Liao 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51

typedef struct SSessionAggOperatorInfo {
  SOptrBasicInfo     binfo;
  SAggSupporter      aggSup;
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
  bool               reptScan;  // next round scan
  int64_t            gap;       // session window gap
  int32_t            tsSlotId;  // primary timestamp slot id
  STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;

typedef struct SStateWindowOperatorInfo {
  SOptrBasicInfo     binfo;
  SAggSupporter      aggSup;
  SExprSupp          scalarSup;
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
  SColumn            stateCol;  // start row index
  bool               hasKey;
  SStateKeys         stateKey;
  int32_t            tsSlotId;  // primary timestamp column slot id
  STimeWindowAggSupp twAggSup;
} SStateWindowOperatorInfo;

52 53 54 55 56
typedef enum SResultTsInterpType {
  RESULT_ROW_START_INTERP = 1,
  RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;

5
54liuyao 已提交
57 58
typedef struct SPullWindowInfo {
  STimeWindow window;
L
Liu Jicong 已提交
59
  uint64_t    groupId;
5
54liuyao 已提交
60
  STimeWindow calWin;
5
54liuyao 已提交
61 62
} SPullWindowInfo;

63 64
typedef struct SOpenWindowInfo {
  SResultRowPosition pos;
L
Liu Jicong 已提交
65
  uint64_t           groupId;
66 67
} SOpenWindowInfo;

68 69
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);

L
Liu Jicong 已提交
70 71
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
                                              uint64_t groupId);
72 73
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);

X
Xiaoyu Wang 已提交
74
static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
75 76 77

static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
78
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
79 80 81 82 83 84 85 86 87 88 89
                                      SExecTaskInfo* pTaskInfo) {
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup);

  if (pResultRow == NULL) {
    *pResult = NULL;
    return TSDB_CODE_SUCCESS;
  }

  // set time window for current result
  pResultRow->win = (*win);
90

91
  *pResult = pResultRow;
92
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
93

94 95 96 97 98 99 100 101 102 103 104 105 106
  return TSDB_CODE_SUCCESS;
}

static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
  int64_t* ts = (int64_t*)pColData->pData;
  int32_t  delta = includeEndpoint ? 1 : 0;

  int64_t duration = pWin->ekey - pWin->skey + delta;
  ts[2] = duration;            // set the duration
  ts[3] = pWin->skey;          // window start key
  ts[4] = pWin->ekey + delta;  // window end key
}

107
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
108 109 110
  pRowSup->win.ekey = ts;
  pRowSup->prevTs = ts;
  pRowSup->numOfRows += 1;
111
  pRowSup->groupId = groupId;
112 113
}

dengyihao's avatar
dengyihao 已提交
114 115
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
                                     uint64_t groupId) {
116 117 118
  pRowSup->startRowIndex = rowIndex;
  pRowSup->numOfRows = 0;
  pRowSup->win.skey = tsList[rowIndex];
119
  pRowSup->groupId = groupId;
120 121 122 123
}

static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey,
                                                   int16_t pos, int16_t order, int64_t* pData) {
124
  int32_t forwardRows = 0;
125 126 127 128

  if (order == TSDB_ORDER_ASC) {
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
    if (end >= 0) {
129
      forwardRows = end;
130 131

      if (pData[end + pos] == ekey) {
132
        forwardRows += 1;
133 134 135
      }
    }
  } else {
136
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
137
    if (end >= 0) {
138
      forwardRows = end;
139

140
      if (pData[end + pos] == ekey) {
141
        forwardRows += 1;
142 143
      }
    }
X
Xiaoyu Wang 已提交
144 145 146 147 148 149 150 151
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
    //    if (end >= 0) {
    //      forwardRows = pos - end;
    //
    //      if (pData[end] == ekey) {
    //        forwardRows += 1;
    //      }
    //    }
152 153
  }

154 155
  assert(forwardRows >= 0);
  return forwardRows;
156 157
}

5
54liuyao 已提交
158
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
  int32_t midPos = -1;
  int32_t numOfRows;

  if (num <= 0) {
    return -1;
  }

  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
175 176 177 178 179 180 181 182 183 184 185
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }
186 187 188 189 190 191

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
192 193
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

X
Xiaoyu Wang 已提交
229 230
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
  assert(startPos >= 0 && startPos < pDataBlockInfo->rows);

  int32_t num = -1;
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);

  if (order == TSDB_ORDER_ASC) {
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
      if (item != NULL) {
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
      }
    } else {
      num = pDataBlockInfo->rows - startPos;
      if (item != NULL) {
        item->lastKey = pDataBlockInfo->window.ekey + step;
      }
    }
  } else {  // desc
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
      if (item != NULL) {
252
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
253 254
      }
    } else {
255
      num = pDataBlockInfo->rows - startPos;
256
      if (item != NULL) {
257
        item->lastKey = pDataBlockInfo->window.ekey + step;
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
      }
    }
  }

  assert(num >= 0);
  return num;
}

static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t order, STimeWindow* tw) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
  // convert key to second
  key = convertTimePrecision(key, precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
  time_t    t = (time_t)key;
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
289
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
290 291 292 293

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
294
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
295 296 297 298

  tw->ekey -= 1;
}

5
54liuyao 已提交
299 300 301 302
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  getNextTimeWindow(pInterval, pInterval->precision, order, tw);
}

303 304
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
305
  SqlFunctionCtx* pCtx = pSup->pCtx;
306

307
  int32_t index = 1;
308
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
H
Haojun Liao 已提交
309
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
310 311 312 313
      pCtx[k].start.key = INT64_MIN;
      continue;
    }

X
Xiaoyu Wang 已提交
314
    SFunctParam*     pParam = &pCtx[k].param[0];
315 316
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);

317
    ASSERT(pColInfo->info.type == pParam->pCol->type && curTs != windowKey);
318

319
    double v1 = 0, v2 = 0, v = 0;
320
    if (prevRowIndex == -1) {
321
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
322
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData);
323
    } else {
324
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex));
325 326
    }

327
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
328

329
#if 0
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
    if (functionId == FUNCTION_INTERP) {
      if (type == RESULT_ROW_START_INTERP) {
        pCtx[k].start.key = prevTs;
        pCtx[k].start.val = v1;

        pCtx[k].end.key = curTs;
        pCtx[k].end.val = v2;

        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
          if (prevRowIndex == -1) {
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
          } else {
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
          }

          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
        }
      }
    } else if (functionId == FUNCTION_TWA) {
349 350
#endif

X
Xiaoyu Wang 已提交
351 352 353
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
    SPoint point = (SPoint){.key = windowKey, .val = &v};
354

X
Xiaoyu Wang 已提交
355
    taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
356

X
Xiaoyu Wang 已提交
357 358 359 360 361 362
    if (type == RESULT_ROW_START_INTERP) {
      pCtx[k].start.key = point.key;
      pCtx[k].start.val = v;
    } else {
      pCtx[k].end.key = point.key;
      pCtx[k].end.val = v;
363
    }
X
Xiaoyu Wang 已提交
364 365 366

    index += 1;
  }
367
#if 0
368
  }
369
#endif
370 371 372 373 374 375 376 377 378 379 380 381 382 383
}

static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
  if (type == RESULT_ROW_START_INTERP) {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      pCtx[k].start.key = INT64_MIN;
    }
  } else {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      pCtx[k].end.key = INT64_MIN;
    }
  }
}

384 385
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
386
  bool ascQuery = (pInfo->inputOrder == TSDB_ORDER_ASC);
387

388
  TSKEY curTs = tsCols[pos];
389 390

  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
X
Xiaoyu Wang 已提交
391
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
392 393 394 395 396

  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
  // start exactly from this point, no need to do interpolation
  TSKEY key = ascQuery ? win->skey : win->ekey;
  if (key == curTs) {
397
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
398 399 400
    return true;
  }

401 402
  // it is the first time window, no need to do interpolation
  if (pTsKey->isNull && pos == 0) {
403
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
404 405
  } else {
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
406 407
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
                              RESULT_ROW_START_INTERP, pSup);
408 409 410 411 412
  }

  return true;
}

413 414 415
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
                                            SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
                                            STimeWindow* win) {
416
  int32_t order = pInfo->inputOrder;
417 418

  TSKEY actualEndKey = tsCols[endRowIndex];
419
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
420 421

  // not ended in current data block, do not invoke interpolation
422
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
423
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
424 425 426
    return false;
  }

427
  // there is actual end point of current time window, no interpolation needs
428
  if (key == actualEndKey) {
429
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
430 431 432
    return true;
  }

433
  int32_t nextRowIndex = endRowIndex + 1;
434 435 436
  assert(nextRowIndex >= 0);

  TSKEY nextKey = tsCols[nextRowIndex];
437 438
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
                            RESULT_ROW_END_INTERP, pSup);
439 440 441
  return true;
}

442 443
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
  if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
5
54liuyao 已提交
444 445 446 447 448
    return false;
  }
  return true;
}

449 450 451 452
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
}

453
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
5
54liuyao 已提交
454
                                      TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
X
Xiaoyu Wang 已提交
455
  bool ascQuery = (order == TSDB_ORDER_ASC);
456 457 458 459 460 461 462 463 464 465

  int32_t precision = pInterval->precision;
  getNextTimeWindow(pInterval, precision, order, pNext);

  // next time window is not in current block
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
    return -1;
  }

5
54liuyao 已提交
466 467 468 469
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
    return -1;
  }

470
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
471 472 473 474
  int32_t startPos = 0;

  // tumbling time window query, a special case of sliding time window query
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
475
    startPos = prevPosition + 1;
476
  } else {
477
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
478 479
      startPos = 0;
    } else {
480
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
    }
  }

  /* interp query with fill should not skip time window */
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
  //    return startPos;
  //  }

  /*
   * This time window does not cover any data, try next time window,
   * this case may happen when the time window is too small
   */
  if (primaryKeys == NULL) {
    if (ascQuery) {
      assert(pDataBlockInfo->window.skey <= pNext->ekey);
    } else {
      assert(pDataBlockInfo->window.ekey >= pNext->skey);
    }
  } else {
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
      TSKEY next = primaryKeys[startPos];
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
        pNext->skey = taosTimeTruncate(next, pInterval, precision);
        pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
      } else {
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
        pNext->skey = pNext->ekey - pInterval->interval + 1;
      }
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
      TSKEY next = primaryKeys[startPos];
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
        pNext->skey = taosTimeTruncate(next, pInterval, precision);
        pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
      } else {
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
        pNext->ekey = pNext->skey + pInterval->interval - 1;
      }
    }
  }

  return startPos;
}

524
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
525
  ASSERT(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
  if (type == RESULT_ROW_START_INTERP) {
    return pResult->startInterp == true;
  } else {
    return pResult->endInterp == true;
  }
}

static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
  assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
  if (type == RESULT_ROW_START_INTERP) {
    pResult->startInterp = true;
  } else {
    pResult->endInterp = true;
  }
}

542 543
static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
                                        STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
544
  if (!pInfo->timeWindowInterpo) {
545 546 547
    return;
  }

548
  ASSERT(pBlock != NULL);
549 550 551 552 553
  if (pBlock->pDataBlock == NULL) {
    //    tscError("pBlock->pDataBlock == NULL");
    return;
  }

554
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
555 556

  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
557
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
558
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
559
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
560 561 562 563
    if (interp) {
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
    }
  } else {
564
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
565 566 567 568 569 570 571 572
  }

  // point interpolation does not require the end key time window interpolation.
  //  if (pointInterpQuery) {
  //    return;
  //  }

  // interpolation query does not generate the time window end interpolation
573
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
574
  if (!done) {
575
    int32_t endRowIndex = startPos + forwardRows - 1;
576

577
    TSKEY endKey = (pInfo->inputOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
578
    bool  interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
579 580 581 582
    if (interp) {
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
    }
  } else {
583
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
584 585 586
  }
}

587 588
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
  if (pBlock->pDataBlock == NULL) {
589 590 591
    return;
  }

592 593 594 595 596 597 598
  size_t num = taosArrayGetSize(pPrevKeys);
  for (int32_t k = 0; k < num; ++k) {
    SColumn* pc = taosArrayGet(pCols, k);

    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);

    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
X
Xiaoyu Wang 已提交
599
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
600 601 602 603 604 605 606
      if (colDataIsNull_s(pColInfo, i)) {
        continue;
      }

      char* val = colDataGetData(pColInfo, i);
      if (IS_VAR_DATA_TYPE(pkey->type)) {
        memcpy(pkey->pData, val, varDataTLen(val));
607
        ASSERT(varDataTLen(val) <= pkey->bytes);
608 609 610 611 612 613
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }

      break;
    }
614 615 616
  }
}

617 618 619 620
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;

621
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
622
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
623

L
Liu Jicong 已提交
624 625
  int32_t startPos = 0;
  int32_t numOfOutput = pSup->numOfExprs;
626

627
  SResultRow* pResult = NULL;
628

629
  while (1) {
L
Liu Jicong 已提交
630 631 632
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
    uint64_t            groupId = pOpenWin->groupId;
633
    SResultRowPosition* p1 = &pOpenWin->pos;
634 635 636
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
      break;
    }
637

638
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
639
    ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
640

641
    if (pr->closed) {
642
      ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
X
Xiaoyu Wang 已提交
643
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP));
644 645
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
      taosMemoryFree(pNode);
646 647
      continue;
    }
648

649
    STimeWindow w = pr->win;
650 651
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
652
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
653
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
654 655
    }

656
    ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
657

X
Xiaoyu Wang 已提交
658 659
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
H
Haojun Liao 已提交
660
    if (groupId == pBlock->info.id.groupId) {
661 662 663
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
                                RESULT_ROW_END_INTERP, pSup);
    }
664 665

    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
666
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
667

668
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
H
Haojun Liao 已提交
669
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows,
H
Haojun Liao 已提交
670
                     numOfExprs);
671 672 673

    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
      closeResultRow(pr);
674 675
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
      taosMemoryFree(pNode);
X
Xiaoyu Wang 已提交
676
    } else {  // the remains are can not be closed yet.
677
      break;
678
    }
679
  }
680
}
681

5
54liuyao 已提交
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);

int32_t binarySearchCom(void* keyList, int num, void* pKey, int order, __compare_fn_t comparefn) {
  int firstPos = 0, lastPos = num - 1, midPos = -1;
  int numOfRows = 0;

  if (num <= 0) return -1;
  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller or equal than the key
    while (1) {
      if (comparefn(pKey, keyList, lastPos) >= 0) return lastPos;
      if (comparefn(pKey, keyList, firstPos) == 0) return firstPos;
      if (comparefn(pKey, keyList, firstPos) < 0) return firstPos - 1;

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (comparefn(pKey, keyList, midPos) < 0) {
        lastPos = midPos - 1;
      } else if (comparefn(pKey, keyList, midPos) > 0) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger or equal than the key
    while (1) {
      if (comparefn(pKey, keyList, firstPos) <= 0) return firstPos;
      if (comparefn(pKey, keyList, lastPos) == 0) return lastPos;

      if (comparefn(pKey, keyList, lastPos) > 0) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (comparefn(pKey, keyList, midPos) < 0) {
        lastPos = midPos - 1;
      } else if (comparefn(pKey, keyList, midPos) > 0) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

5
54liuyao 已提交
738
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
739

X
Xiaoyu Wang 已提交
740 741 742
int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_fn_t getValuefn) {
  int firstPos = 0, lastPos = num - 1, midPos = -1;
  int numOfRows = 0;
5
54liuyao 已提交
743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788

  if (num <= 0) return -1;
  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller or equal than the key
    while (1) {
      if (key >= getValuefn(keyList, lastPos)) return lastPos;
      if (key == getValuefn(keyList, firstPos)) return firstPos;
      if (key < getValuefn(keyList, firstPos)) return firstPos - 1;

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < getValuefn(keyList, midPos)) {
        lastPos = midPos - 1;
      } else if (key > getValuefn(keyList, midPos)) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger or equal than the key
    while (1) {
      if (key <= getValuefn(keyList, firstPos)) return firstPos;
      if (key == getValuefn(keyList, lastPos)) return lastPos;

      if (key > getValuefn(keyList, lastPos)) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < getValuefn(keyList, midPos)) {
        lastPos = midPos - 1;
      } else if (key > getValuefn(keyList, midPos)) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
789 790
  }

5
54liuyao 已提交
791 792 793
  return midPos;
}

5
54liuyao 已提交
794
int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
L
Liu Jicong 已提交
795
  SArray*          res = (SArray*)data;
5
54liuyao 已提交
796
  SPullWindowInfo* pos = taosArrayGet(res, index);
L
Liu Jicong 已提交
797
  SPullWindowInfo* pData = (SPullWindowInfo*)pKey;
5
54liuyao 已提交
798
  if (pData->groupId > pos->groupId) {
5
54liuyao 已提交
799
    return 1;
5
54liuyao 已提交
800 801 802 803 804 805 806 807
  } else if (pData->groupId < pos->groupId) {
    return -1;
  }

  if (pData->window.skey > pos->window.ekey) {
    return 1;
  } else if (pData->window.ekey < pos->window.skey) {
    return -1;
5
54liuyao 已提交
808
  }
5
54liuyao 已提交
809
  return 0;
5
54liuyao 已提交
810 811 812 813 814 815 816 817
}

static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
  int32_t size = taosArrayGetSize(pPullWins);
  int32_t index = binarySearchCom(pPullWins, size, pPullInfo, TSDB_ORDER_DESC, comparePullWinKey);
  if (index == -1) {
    index = 0;
  } else {
5
54liuyao 已提交
818 819 820 821 822 823 824
    int32_t code = comparePullWinKey(pPullInfo, pPullWins, index);
    if (code == 0) {
      SPullWindowInfo* pos = taosArrayGet(pPullWins ,index);
      pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey);
      pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey);
      pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey);
      pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey);
5
54liuyao 已提交
825
      return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
826 827
    } else if (code > 0 ){
      index++;
5
54liuyao 已提交
828 829 830 831 832 833 834 835
    }
  }
  if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
836 837 838
static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
  winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
  return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
5
54liuyao 已提交
839 840
}

5
54liuyao 已提交
841 842 843 844 845 846 847 848
static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) {
  SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
  if (newPos == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  newPos->groupId = groupId;
  newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
  *(int64_t*)newPos->key = ts;
H
Haojun Liao 已提交
849 850
  SWinKey key = {.ts = ts, .groupId = groupId};
  if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
5
54liuyao 已提交
851 852 853
    taosMemoryFree(newPos);
  }
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
854 855
}

5
54liuyao 已提交
856 857 858 859
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) {
  return saveWinResult(ts, -1, -1, groupId, pUpdatedMap);
}

5
54liuyao 已提交
860
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
5
54liuyao 已提交
861 862
  int32_t size = taosArrayGetSize(pWins);
  for (int32_t i = 0; i < size; i++) {
H
Haojun Liao 已提交
863
    SWinKey* pW = taosArrayGet(pWins, i);
5
54liuyao 已提交
864 865 866 867 868 869
    void*    tmp = taosHashGet(pUpdatedMap, pW, sizeof(SWinKey));
    if (tmp) {
      void* value = *(void**)tmp;
      taosMemoryFree(value);
      taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
    }
5
54liuyao 已提交
870 871 872
  }
}

5
54liuyao 已提交
873 874
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
  SArray*     res = (SArray*)data;
5
54liuyao 已提交
875 876 877
  SWinKey*    pDataPos = taosArrayGet(res, index);
  SResKeyPos* pRKey = (SResKeyPos*)pKey;
  if (pRKey->groupId > pDataPos->groupId) {
5
54liuyao 已提交
878
    return 1;
5
54liuyao 已提交
879 880
  } else if (pRKey->groupId < pDataPos->groupId) {
    return -1;
5
54liuyao 已提交
881
  }
5
54liuyao 已提交
882 883 884 885 886 887 888
  
  if (*(int64_t*)pRKey->key > pDataPos->ts) {
    return 1;
  } else if (*(int64_t*)pRKey->key < pDataPos->ts){
    return -1;
  }
  return 0;
5
54liuyao 已提交
889 890
}

5
54liuyao 已提交
891
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
5
54liuyao 已提交
892 893
  taosArraySort(pDelWins, winKeyCmprImpl);
  taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
L
Liu Jicong 已提交
894 895
  int32_t delSize = taosArrayGetSize(pDelWins);
  if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
5
54liuyao 已提交
896
    return;
dengyihao's avatar
dengyihao 已提交
897
  }
L
Liu Jicong 已提交
898
  void* pIte = NULL;
5
54liuyao 已提交
899
  while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
900
    SResKeyPos* pResKey = *(SResKeyPos**)pIte;
5
54liuyao 已提交
901 902
    int32_t     index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes);
    if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) {
903
      taosArrayRemove(pDelWins, index);
904
      delSize = taosArrayGetSize(pDelWins);
905 906 907 908
    }
  }
}

5
54liuyao 已提交
909
bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
910
  ASSERT(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0);
5
54liuyao 已提交
911
  return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
5
54liuyao 已提交
912 913
}

5
54liuyao 已提交
914 915 916 917 918
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { return isOverdue(pWin->ekey, pTwSup); }

bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
  return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark;
}
5
54liuyao 已提交
919

5
54liuyao 已提交
920
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
921
                            int32_t scanFlag) {
922
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
923

924
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
925
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
926

X
Xiaoyu Wang 已提交
927
  int32_t     startPos = 0;
928
  int32_t     numOfOutput = pSup->numOfExprs;
X
Xiaoyu Wang 已提交
929
  int64_t*    tsCols = extractTsCol(pBlock, pInfo);
H
Haojun Liao 已提交
930
  uint64_t    tableGroupId = pBlock->info.id.groupId;
931
  bool        ascScan = (pInfo->inputOrder == TSDB_ORDER_ASC);
X
Xiaoyu Wang 已提交
932 933
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
  SResultRow* pResult = NULL;
934

935 936
  STimeWindow win =
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
937 938
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
939
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
940
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
941
  }
X
Xiaoyu Wang 已提交
942 943
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
  int32_t forwardRows =
944
      getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
945
  ASSERT(forwardRows > 0);
946 947

  // prev time window not interpolation yet.
948
  if (pInfo->timeWindowInterpo) {
949
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
950
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
951 952

    // restore current time window
953 954
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
955
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
956
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
957 958
    }

959
    // window start key interpolation
960
    doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
961
  }
962

963
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
H
Haojun Liao 已提交
964
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
965
                   numOfOutput);
966 967

  doCloseWindow(pResultRowInfo, pInfo, pResult);
968 969 970

  STimeWindow nextWin = win;
  while (1) {
971
    int32_t prevEndPos = forwardRows - 1 + startPos;
972
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->inputOrder);
973 974 975 976
    if (startPos < 0) {
      break;
    }
    // null data, failed to allocate more memory buffer
X
Xiaoyu Wang 已提交
977
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
978
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
979
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
980
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
981 982
    }

X
Xiaoyu Wang 已提交
983
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
984
    forwardRows =
985
        getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
986
    // window start(end) key interpolation
987
    doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
L
Liu Jicong 已提交
988
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
S
shenglian zhou 已提交
989
#if 0
990 991 992 993
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
        (!ascScan && ekey >= pBlock->info.window.skey)) {
      // window start(end) key interpolation
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
994
    } else if (pInfo->timeWindowInterpo) {
995 996
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
    }
S
shenglian zhou 已提交
997
#endif
998
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
H
Haojun Liao 已提交
999
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
H
Haojun Liao 已提交
1000
                     numOfOutput);
1001
    doCloseWindow(pResultRowInfo, pInfo, pResult);
1002 1003 1004
  }

  if (pInfo->timeWindowInterpo) {
1005
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
1006
  }
1007 1008 1009 1010 1011 1012
}

void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
  // current result is done in computing final results.
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
    closeResultRow(pResult);
1013
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
D
dapan1121 已提交
1014
    taosMemoryFree(pNode);
1015 1016 1017
  }
}

1018 1019 1020 1021 1022
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) {
  SOpenWindowInfo openWin = {0};
  openWin.pos.pageId = pResult->pageId;
  openWin.pos.offset = pResult->offset;
  openWin.groupId = groupId;
L
Liu Jicong 已提交
1023
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
1024
  if (pn == NULL) {
1025 1026
    tdListAppend(pResultRowInfo->openWindow, &openWin);
    return openWin.pos;
1027 1028
  }

L
Liu Jicong 已提交
1029
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
1030 1031
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
    tdListAppend(pResultRowInfo->openWindow, &openWin);
1032 1033
  }

1034
  return openWin.pos;
1035 1036 1037 1038
}

int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
  TSKEY* tsCols = NULL;
1039

1040 1041 1042 1043
  if (pBlock->pDataBlock != NULL) {
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
    tsCols = (int64_t*)pColDataInfo->pData;

1044 1045 1046 1047 1048 1049
    // no data in primary ts
    if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) {
      return NULL;
    }

    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
1050 1051 1052 1053 1054
      blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
    }
  }

  return tsCols;
1055 1056 1057 1058 1059 1060 1061
}

static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
  }

1062 1063 1064
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SOperatorInfo* downstream = pOperator->pDownstream[0];

1065
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1066
  SExprSupp*                pSup = &pOperator->exprSupp;
1067

1068
  int32_t scanFlag = MAIN_SCAN;
1069
  int64_t st = taosGetTimestampUs();
1070 1071

  while (1) {
1072
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1073 1074 1075 1076
    if (pBlock == NULL) {
      break;
    }

1077
    getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag);
1078

1079
    if (pInfo->scalarSupp.pExprInfo != NULL) {
L
Liu Jicong 已提交
1080 1081
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
1082 1083
    }

1084
    // the pDataBlock are always the same one, no need to call this again
1085
    setInputDataBlock(pSup, pBlock, pInfo->inputOrder, scanFlag, true);
1086 1087
    blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);

1088
    hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag);
1089 1090
  }

1091
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->resultTsOrder);
1092
  OPTR_SET_OPENED(pOperator);
1093 1094

  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1095 1096 1097
  return TSDB_CODE_SUCCESS;
}

1098 1099 1100 1101 1102
static bool compareVal(const char* v, const SStateKeys* pKey) {
  if (IS_VAR_DATA_TYPE(pKey->type)) {
    if (varDataLen(v) != varDataLen(pKey->pData)) {
      return false;
    } else {
D
dapan1121 已提交
1103
      return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
1104 1105 1106 1107 1108 1109
    }
  } else {
    return memcmp(pKey->pData, v, pKey->bytes) == 0;
  }
}

1110
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
L
Liu Jicong 已提交
1111
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1112
  SExprSupp*     pSup = &pOperator->exprSupp;
1113

1114
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
H
Haojun Liao 已提交
1115
  int64_t          gid = pBlock->info.id.groupId;
1116 1117

  bool    masterScan = true;
1118
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
1119 1120
  int16_t bytes = pStateColInfoData->info.bytes;

1121
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
1122 1123 1124 1125 1126
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;

  SWindowRowsSup* pRowSup = &pInfo->winSup;
  pRowSup->numOfRows = 0;

1127
  struct SColumnDataAgg* pAgg = NULL;
1128
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
X
Xiaoyu Wang 已提交
1129
    pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
1130
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
1131 1132 1133 1134 1135
      continue;
    }

    char* val = colDataGetData(pStateColInfoData, j);

1136
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
1137 1138 1139 1140 1141 1142 1143
      // todo extract method
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
        varDataCopy(pInfo->stateKey.pData, val);
      } else {
        memcpy(pInfo->stateKey.pData, val, bytes);
      }

1144 1145
      pInfo->hasKey = true;

1146 1147
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1148
    } else if (compareVal(val, &pInfo->stateKey)) {
1149
      doKeepTuple(pRowSup, tsList[j], gid);
1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
      if (j == 0 && pRowSup->startRowIndex != 0) {
        pRowSup->startRowIndex = 0;
      }
    } else {  // a new state window started
      SResultRow* pResult = NULL;

      // keep the time window for the closed time window.
      STimeWindow window = pRowSup->win;

      pRowSup->win.ekey = pRowSup->win.skey;
1160 1161
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1162
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1163
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1164 1165 1166
      }

      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
H
Haojun Liao 已提交
1167
      applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1168
                       pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1169 1170

      // here we start a new session window
1171 1172
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1173 1174 1175 1176 1177 1178 1179

      // todo extract method
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
        varDataCopy(pInfo->stateKey.pData, val);
      } else {
        memcpy(pInfo->stateKey.pData, val, bytes);
      }
1180 1181 1182 1183 1184
    }
  }

  SResultRow* pResult = NULL;
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
1185 1186
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1187
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1188
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1189 1190 1191
  }

  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
H
Haojun Liao 已提交
1192
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows,
H
Haojun Liao 已提交
1193
                   pBlock->info.rows, numOfOutput);
1194 1195
}

H
Hongze Cheng 已提交
1196
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
1197 1198
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1199 1200 1201
  }

  SStateWindowOperatorInfo* pInfo = pOperator->info;
1202
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1203

1204 1205 1206
  SExprSupp* pSup = &pOperator->exprSupp;
  int32_t    order = TSDB_ORDER_ASC;
  int64_t    st = taosGetTimestampUs();
1207 1208 1209

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  while (1) {
1210
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1211 1212 1213 1214
    if (pBlock == NULL) {
      break;
    }

1215
    setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
1216 1217
    blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);

1218 1219 1220 1221 1222 1223 1224 1225 1226
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
    if (pInfo->scalarSup.pExprInfo != NULL) {
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
      }
    }

1227 1228 1229
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
  }

X
Xiaoyu Wang 已提交
1230
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1231
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1232 1233
  pOperator->status = OP_RES_TO_RETURN;

1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
  return TSDB_CODE_SUCCESS;
}

static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SStateWindowOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;

  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1248
    setOperatorCompleted(pOperator);
1249 1250 1251
    return NULL;
  }

1252
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1253
  while (1) {
1254
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1255
    doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1256

1257
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1258
    if (!hasRemain) {
H
Haojun Liao 已提交
1259
      setOperatorCompleted(pOperator);
1260 1261
      break;
    }
1262

1263 1264 1265 1266
    if (pBInfo->pRes->info.rows > 0) {
      break;
    }
  }
1267

1268
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1269
  return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1270 1271
}

1272
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
1273
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1274
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1275 1276 1277 1278 1279 1280

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pBlock = pInfo->binfo.pRes;
1281 1282 1283 1284
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
    return NULL;
  }
1285

1286 1287 1288
  while (1) {
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
1289

1290 1291
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
    if (!hasRemain) {
H
Haojun Liao 已提交
1292
      setOperatorCompleted(pOperator);
1293
      break;
1294 1295
    }

1296 1297 1298
    if (pBlock->info.rows > 0) {
      break;
    }
1299
  }
1300 1301 1302 1303 1304

  size_t rows = pBlock->info.rows;
  pOperator->resultInfo.totalRows += rows;

  return (rows == 0) ? NULL : pBlock;
1305 1306
}

5
54liuyao 已提交
1307
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
L
Liu Jicong 已提交
1308
  for (int i = 0; i < num; i++) {
5
54liuyao 已提交
1309 1310
    if (type == STREAM_INVERT) {
      fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
L
Liu Jicong 已提交
1311
    } else if (type == STREAM_NORMAL) {
5
54liuyao 已提交
1312 1313 1314 1315
      fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
    }
  }
}
5
54liuyao 已提交
1316

5
54liuyao 已提交
1317
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
1318
  SResultRow*     pResult = getResultRowByPos(pResultBuf, p1, false);
1319
  SqlFunctionCtx* pCtx = pSup->pCtx;
5
54liuyao 已提交
1320
  for (int32_t i = 0; i < numOfOutput; ++i) {
1321
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
1322 1323 1324 1325 1326 1327 1328 1329 1330
    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }
    pResInfo->initialized = false;
    if (pCtx[i].functionId != -1) {
      pCtx[i].fpSet.init(&pCtx[i], pResInfo);
    }
  }
5
54liuyao 已提交
1331 1332 1333
  SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
  setBufPageDirty(bufPage, true);
  releaseBufPage(pResultBuf, bufPage);
5
54liuyao 已提交
1334 1335
}

1336
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) {
5
54liuyao 已提交
1337 1338 1339
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SWinKey                      key = {.ts = ts, .groupId = groupId};
  tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
1340
  streamStateDel(pInfo->pState, &key);
5
54liuyao 已提交
1341 1342 1343
  return true;
}

1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
                            SHashObj* pUpdatedMap) {
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SColumnInfoData*             pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*                       startTsCols = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData*             pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*                       endTsCols = (TSKEY*)pEndTsCol->pData;
  SColumnInfoData*             pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*                       calStTsCols = (TSKEY*)pCalStTsCol->pData;
  SColumnInfoData*             pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*                       calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
  SColumnInfoData*             pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*                    pGpDatas = (uint64_t*)pGpCol->pData;
5
54liuyao 已提交
1357
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
H
Haojun Liao 已提交
1358
    SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1359
    dumyInfo.cur.pageId = -1;
H
Haojun Liao 已提交
1360

1361 1362 1363 1364 1365 1366 1367 1368
    STimeWindow win = {0};
    if (IS_FINAL_OP(pInfo)) {
      win.skey = startTsCols[i];
      win.ekey = endTsCols[i];
    } else {
      win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
    }

5
54liuyao 已提交
1369
    do {
1370 1371 1372 1373
      if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) {
        getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
        continue;
      }
5
54liuyao 已提交
1374
      uint64_t winGpId = pGpDatas[i];
1375
      bool     res = doDeleteWindow(pOperator, win.skey, winGpId);
5
54liuyao 已提交
1376 1377 1378 1379 1380
      SWinKey  winRes = {.ts = win.skey, .groupId = winGpId};
      if (pUpWins && res) {
        taosArrayPush(pUpWins, &winRes);
      }
      if (pUpdatedMap) {
5
54liuyao 已提交
1381 1382 1383 1384 1385 1386
        void* tmp = taosHashGet(pUpdatedMap, &winRes, sizeof(SWinKey));
        if (tmp) {
          void* value = *(void**)tmp;
          taosMemoryFree(value);
          taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
        }
5
54liuyao 已提交
1387 1388
      }
      getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
5
54liuyao 已提交
1389
    } while (win.ekey <= endTsCols[i]);
5
54liuyao 已提交
1390 1391 1392
  }
}

1393 1394 1395 1396 1397 1398
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
  void*   pIte = NULL;
  size_t  keyLen = 0;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
    void*    key = tSimpleHashGetKey(pIte, &keyLen);
5
54liuyao 已提交
1399
    uint64_t groupId = *(uint64_t*)key;
1400
    ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
1401
    TSKEY               ts = *(int64_t*)((char*)key + sizeof(uint64_t));
5
54liuyao 已提交
1402
    SResultRowPosition* pPos = (SResultRowPosition*)pIte;
5
54liuyao 已提交
1403
    int32_t             code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
5
54liuyao 已提交
1404 1405 1406 1407 1408 1409 1410
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

1411 1412
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
  SArray*  res = (SArray*)data;
5
54liuyao 已提交
1413 1414 1415 1416
  SWinKey* pDataPos = taosArrayGet(res, index);
  SWinKey* pWKey = (SWinKey*)pKey;

  if (pWKey->groupId > pDataPos->groupId) {
1417
    return 1;
5
54liuyao 已提交
1418 1419
  } else if (pWKey->groupId < pDataPos->groupId) {
    return -1;
1420
  }
5
54liuyao 已提交
1421 1422 1423 1424 1425 1426 1427

  if (pWKey->ts > pDataPos->ts) {
    return 1;
  } else if (pWKey->ts < pDataPos->ts) {
    return -1;
  }
  return 0;
1428 1429
}

5
54liuyao 已提交
1430
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
1431 1432
                                         SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins,
                                         SOperatorInfo* pOperator) {
5
54liuyao 已提交
1433
  qDebug("===stream===close interval window");
1434 1435 1436 1437
  void*                        pIte = NULL;
  size_t                       keyLen = 0;
  int32_t                      iter = 0;
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
1438
  int32_t                      delSize = taosArrayGetSize(pDelWins);
5
54liuyao 已提交
1439
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
1440 1441 1442 1443 1444 1445 1446 1447 1448 1449
    void*    key = tSimpleHashGetKey(pIte, &keyLen);
    SWinKey* pWinKey = (SWinKey*)key;
    if (delSize > 0) {
      int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
      if (index >= 0 && 0 == compareWinKey(pWinKey, pDelWins, index)) {
        taosArrayRemove(pDelWins, index);
        delSize = taosArrayGetSize(pDelWins);
      }
    }

5
54liuyao 已提交
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
    void*       chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
    STimeWindow win = {
        .skey = pWinKey->ts,
        .ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1,
    };
    if (isCloseWindow(&win, pTwSup)) {
      if (chIds && pPullDataMap) {
        SArray* chAy = *(SArray**)chIds;
        int32_t size = taosArrayGetSize(chAy);
        qDebug("===stream===window %" PRId64 " wait child size:%d", pWinKey->ts, size);
        for (int32_t i = 0; i < size; i++) {
          qDebug("===stream===window %" PRId64 " wait child id:%d", pWinKey->ts, *(int32_t*)taosArrayGet(chAy, i));
        }
        continue;
      } else if (pPullDataMap) {
        qDebug("===stream===close window %" PRId64, pWinKey->ts);
      }

      if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
        int32_t code = saveWinResultInfo(pWinKey->ts, pWinKey->groupId, closeWins);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519
    }
  }
  return TSDB_CODE_SUCCESS;
}

STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
  STimeWindow w = {.skey = ts, .ekey = INT64_MAX};
  w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
  return w;
}

static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, TSKEY mark, SInterval* pInterval,
                                  SWinKey* key) {
  STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
  SWinKey     next = {0};
  while (tw.ekey < mark) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, key);
    int32_t          code = streamStateGetKVByCur(pCur, &next, NULL, 0);
    streamStateFreeCur(pCur);

    void* chIds = taosHashGet(pPullDataMap, key, sizeof(SWinKey));
    if (chIds && pPullDataMap) {
      SArray* chAy = *(SArray**)chIds;
      int32_t size = taosArrayGetSize(chAy);
      qDebug("===stream===window %" PRId64 " wait child size:%d", key->ts, size);
      for (int32_t i = 0; i < size; i++) {
        qDebug("===stream===window %" PRId64 " wait child id:%d", key->ts, *(int32_t*)taosArrayGet(chAy, i));
      }
      break;
    }
    qDebug("===stream===delete window %" PRId64, key->ts);
    int32_t codeDel = streamStateDel(pState, key);
    if (codeDel != TSDB_CODE_SUCCESS) {
      code = streamStateGetFirst(pState, key);
      if (code != TSDB_CODE_SUCCESS) {
        qDebug("===stream===stream state first key: empty-empty");
        return;
      }
      continue;
    }
    if (code == TSDB_CODE_SUCCESS) {
      *key = next;
      tw = getFinalTimeWindow(key->ts, pInterval);
    }
  }
5
54liuyao 已提交
1520

5
54liuyao 已提交
1521 1522
  // for debug
  if (qDebugFlag & DEBUG_DEBUG && mark > 0) {
1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535
    SStreamStateCur* pCur = streamStateGetCur(pState, key);
    int32_t          code = streamStateCurPrev(pState, pCur);
    if (code == TSDB_CODE_SUCCESS) {
      SWinKey tmpKey = {0};
      code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
      if (code == TSDB_CODE_SUCCESS) {
        STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
        qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey,
               tw.ekey, tmpKey.groupId, mark);
      } else {
        STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
        qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
               key->groupId, mark);
5
54liuyao 已提交
1536
      }
1537 1538 1539 1540
    } else {
      STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
      qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
             key->groupId, mark);
5
54liuyao 已提交
1541
    }
1542
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1543 1544 1545
  }
}

1546
static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) {
5
54liuyao 已提交
1547 1548
  int32_t size = taosArrayGetSize(pChildren);
  for (int32_t i = 0; i < size; i++) {
1549 1550
    SOperatorInfo*               pChildOp = taosArrayGetP(pChildren, i);
    SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
1551
    ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
5
54liuyao 已提交
1552
    pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
1553
    closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
1554
                              NULL, pOperator);
1555 1556 1557
  }
}

1558 1559
static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index,
                                SSDataBlock* pBlock) {
1560 1561 1562 1563 1564 1565 1566 1567
  blockDataCleanup(pBlock);
  int32_t size = taosArrayGetSize(pWins);
  if (*index == size) {
    *index = 0;
    taosArrayClear(pWins);
    return;
  }
  blockDataEnsureCapacity(pBlock, size - *index);
1568
  uint64_t uid = 0;
1569
  for (int32_t i = *index; i < size; i++) {
H
Haojun Liao 已提交
1570
    SWinKey* pWin = taosArrayGet(pWins, i);
1571 1572
    void*    tbname = NULL;
    streamStateGetParName(pInfo->pState, pWin->groupId, &tbname);
1573 1574 1575 1576 1577 1578 1579
    if (tbname == NULL) {
      appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
    } else {
      char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
      STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
      appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
    }
1580
    tdbFree(tbname);
1581
    (*index)++;
5
54liuyao 已提交
1582 1583 1584
  }
}

1585
static void destroyStateWindowOperatorInfo(void* param) {
1586
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1587
  cleanupBasicInfo(&pInfo->binfo);
1588
  taosMemoryFreeClear(pInfo->stateKey.pData);
1589
  cleanupExprSupp(&pInfo->scalarSup);
D
dapan1121 已提交
1590 1591 1592
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  cleanupAggSup(&pInfo->aggSup);
  cleanupGroupResInfo(&pInfo->groupResInfo);
1593

D
dapan1121 已提交
1594
  taosMemoryFreeClear(param);
1595 1596
}

H
Haojun Liao 已提交
1597
static void freeItem(void* param) {
L
Liu Jicong 已提交
1598
  SGroupKeys* pKey = (SGroupKeys*)param;
H
Haojun Liao 已提交
1599 1600 1601
  taosMemoryFree(pKey->pData);
}

1602
void destroyIntervalOperatorInfo(void* param) {
1603
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
1604
  cleanupBasicInfo(&pInfo->binfo);
1605
  cleanupAggSup(&pInfo->aggSup);
1606 1607 1608 1609
  cleanupExprSupp(&pInfo->scalarSupp);

  tdListFree(pInfo->binfo.resultRowInfo.openWindow);

H
Haojun Liao 已提交
1610 1611 1612 1613
  pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols);
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);

  pInfo->pPrevValues = NULL;
1614

H
Haojun Liao 已提交
1615 1616
  cleanupGroupResInfo(&pInfo->groupResInfo);
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
D
dapan1121 已提交
1617
  taosMemoryFreeClear(param);
1618 1619
}

1620
void destroyStreamFinalIntervalOperatorInfo(void* param) {
1621
  SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param;
1622
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
1623
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
1624
  // it should be empty.
5
54liuyao 已提交
1625 1626 1627
  taosHashCleanup(pInfo->pPullDataMap);
  taosArrayDestroy(pInfo->pPullWins);
  blockDataDestroy(pInfo->pPullDataRes);
L
Liu Jicong 已提交
1628 1629
  taosArrayDestroy(pInfo->pDelWins);
  blockDataDestroy(pInfo->pDelRes);
1630
  taosMemoryFreeClear(pInfo->pState);
5
54liuyao 已提交
1631

1632 1633 1634 1635
  if (pInfo->pChildren) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
      SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
5
54liuyao 已提交
1636
      destroyOperatorInfo(pChildOp);
1637
    }
L
Liu Jicong 已提交
1638
    taosArrayDestroy(pInfo->pChildren);
1639
  }
1640
  nodesDestroyNode((SNode*)pInfo->pPhyNode);
5
54liuyao 已提交
1641
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
5
54liuyao 已提交
1642
  cleanupGroupResInfo(&pInfo->groupResInfo);
5
54liuyao 已提交
1643
  cleanupExprSupp(&pInfo->scalarSupp);
1644

D
dapan1121 已提交
1645
  taosMemoryFreeClear(param);
5
54liuyao 已提交
1646 1647
}

1648
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
5
54liuyao 已提交
1649
  for (int32_t i = 0; i < numOfCols; i++) {
5
54liuyao 已提交
1650
    if (fmIsUserDefinedFunc(pFCtx[i].functionId) || !fmIsInvertible(pFCtx[i].functionId)) {
5
54liuyao 已提交
1651 1652 1653 1654 1655 1656
      return false;
    }
  }
  return true;
}

1657
static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo) {
1658 1659 1660
  // the primary timestamp column
  bool needed = false;

H
Haojun Liao 已提交
1661
  for(int32_t i = 0; i < numOfCols; ++i) {
1662
    SExprInfo* pExpr = pCtx[i].pExpr;
H
Haojun Liao 已提交
1663
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
1664
      needed = true;
H
Haojun Liao 已提交
1665
      break;
1666 1667 1668
    }
  }

H
Haojun Liao 已提交
1669 1670 1671
  if (needed) {
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
1672

H
Haojun Liao 已提交
1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687
    {  // ts column
      SColumn c = {0};
      c.colId = 1;
      c.slotId = pInfo->primaryTsIndex;
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
      c.bytes = sizeof(int64_t);
      taosArrayPush(pInfo->pInterpCols, &c);

      SGroupKeys key;
      key.bytes = c.bytes;
      key.type = c.type;
      key.isNull = true;  // to denote no value is assigned yet
      key.pData = taosMemoryCalloc(1, c.bytes);
      taosArrayPush(pInfo->pPrevValues, &key);
    }
1688 1689
  }

X
Xiaoyu Wang 已提交
1690
  for (int32_t i = 0; i < numOfCols; ++i) {
1691 1692
    SExprInfo* pExpr = pCtx[i].pExpr;

H
Haojun Liao 已提交
1693
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
1694 1695 1696
      SFunctParam* pParam = &pExpr->base.pParam[0];

      SColumn c = *pParam->pCol;
1697
      taosArrayPush(pInfo->pInterpCols, &c);
1698 1699

      SGroupKeys key = {0};
X
Xiaoyu Wang 已提交
1700 1701
      key.bytes = c.bytes;
      key.type = c.type;
1702
      key.isNull = false;
X
Xiaoyu Wang 已提交
1703
      key.pData = taosMemoryCalloc(1, c.bytes);
1704
      taosArrayPush(pInfo->pPrevValues, &key);
1705 1706 1707 1708 1709 1710
    }
  }

  return needed;
}

L
Liu Jicong 已提交
1711
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval,
5
54liuyao 已提交
1712
                            STimeWindowAggSupp* pTwSup) {
1713
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
5
54liuyao 已提交
1714
    initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, pTwSup);
1715 1716
    return;
  }
5
54liuyao 已提交
1717
  SStreamScanInfo* pScanInfo = downstream->info;
1718 1719
  pScanInfo->windowSup.parentType = type;
  pScanInfo->windowSup.pIntervalAggSup = pSup;
5
54liuyao 已提交
1720 1721 1722
  if (!pScanInfo->pUpdateInfo) {
    pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
  }
1723
  pScanInfo->interval = *pInterval;
5
54liuyao 已提交
1724
  pScanInfo->twAggSup = *pTwSup;
5
54liuyao 已提交
1725 1726
}

H
Haojun Liao 已提交
1727 1728
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t i = 0; i < numOfExpr; i++) {
H
Haojun Liao 已提交
1729
//    pCtx[i].isStream = true;
H
Haojun Liao 已提交
1730 1731 1732
  }
}

H
Haojun Liao 已提交
1733
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode,
L
Liu Jicong 已提交
1734
                                          SExecTaskInfo* pTaskInfo, bool isStream) {
1735
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
L
Liu Jicong 已提交
1736
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1737 1738 1739 1740
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
1741
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1742 1743 1744 1745 1746 1747
  initBasicInfo(&pInfo->binfo, pResBlock);

  SExprSupp* pSup = &pOperator->exprSupp;
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
H
Haojun Liao 已提交
1748 1749
  initResultSizeInfo(&pOperator->resultInfo, 512);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1750 1751 1752

  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
H
Haojun Liao 已提交
1753
  int32_t    code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
H
Haojun Liao 已提交
1754 1755 1756 1757 1758
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  SInterval interval = {.interval = pPhyNode->interval,
1759 1760 1761 1762 1763
                        .sliding = pPhyNode->sliding,
                        .intervalUnit = pPhyNode->intervalUnit,
                        .slidingUnit = pPhyNode->slidingUnit,
                        .offset = pPhyNode->offset,
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
1764 1765 1766 1767 1768 1769 1770

  STimeWindowAggSupp as = {
      .waterMark = pPhyNode->window.watermark,
      .calTrigger = pPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
  };

1771
  ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
H
Haojun Liao 已提交
1772

L
Liu Jicong 已提交
1773
  pInfo->win = pTaskInfo->window;
1774 1775
  pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
  pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
H
Haojun Liao 已提交
1776 1777
  pInfo->interval = interval;
  pInfo->twAggSup = as;
1778
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1779 1780 1781 1782

  if (pPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar);
H
Haojun Liao 已提交
1783
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
1784 1785 1786 1787
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }
1788

H
Haojun Liao 已提交
1789 1790 1791 1792 1793
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

1794
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
H
Haojun Liao 已提交
1795
  pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo);
1796
  if (pInfo->timeWindowInterpo) {
1797
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
H
Haojun Liao 已提交
1798 1799 1800
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
      goto _error;
    }
1801
  }
1802

1803
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
1804 1805
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1806

1807
  pOperator->fpSet =
1808
      createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, optrDefaultBufFn, NULL);
1809 1810 1811 1812 1813 1814 1815 1816

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

L
Liu Jicong 已提交
1817
_error:
H
Haojun Liao 已提交
1818 1819 1820
  if (pInfo != NULL) {
    destroyIntervalOperatorInfo(pInfo);
  }
1821 1822 1823 1824 1825
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

1826
// todo handle multiple timeline cases. assume no timeline interweaving
1827 1828
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1829
  SExprSupp*     pSup = &pOperator->exprSupp;
1830

1831
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
1832 1833

  bool    masterScan = true;
1834
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
1835
  int64_t gid = pBlock->info.id.groupId;
1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849

  int64_t gap = pInfo->gap;

  if (!pInfo->reptScan) {
    pInfo->reptScan = true;
    pInfo->winSup.prevTs = INT64_MIN;
  }

  SWindowRowsSup* pRowSup = &pInfo->winSup;
  pRowSup->numOfRows = 0;

  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
1850 1851 1852
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
H
Haojun Liao 已提交
1853 1854
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
1855
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1856
      doKeepTuple(pRowSup, tsList[j], gid);
1857 1858 1859 1860 1861 1862 1863 1864 1865 1866
      if (j == 0 && pRowSup->startRowIndex != 0) {
        pRowSup->startRowIndex = 0;
      }
    } else {  // start a new session window
      SResultRow* pResult = NULL;

      // keep the time window for the closed time window.
      STimeWindow window = pRowSup->win;

      pRowSup->win.ekey = pRowSup->win.skey;
1867 1868
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1869
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1870
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1871 1872 1873 1874
      }

      // pInfo->numOfRows data belong to the current session window
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
H
Haojun Liao 已提交
1875
      applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1876
                       pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1877 1878

      // here we start a new session window
1879 1880
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1881 1882 1883 1884 1885
    }
  }

  SResultRow* pResult = NULL;
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
1886 1887
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1888
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1889
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1890 1891 1892
  }

  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
H
Haojun Liao 已提交
1893
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows,
H
Haojun Liao 已提交
1894
                   pBlock->info.rows, numOfOutput);
1895 1896
}

1897
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
1898 1899 1900 1901 1902 1903
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSessionAggOperatorInfo* pInfo = pOperator->info;
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
1904
  SExprSupp*               pSup = &pOperator->exprSupp;
1905 1906

  if (pOperator->status == OP_RES_TO_RETURN) {
1907
    while (1) {
1908
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1909
      doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1910

1911
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1912
      if (!hasRemain) {
H
Haojun Liao 已提交
1913
        setOperatorCompleted(pOperator);
1914 1915
        break;
      }
1916

1917 1918 1919 1920 1921
      if (pBInfo->pRes->info.rows > 0) {
        break;
      }
    }
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1922
    return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1923 1924
  }

1925 1926 1927
  int64_t st = taosGetTimestampUs();
  int32_t order = TSDB_ORDER_ASC;

1928 1929 1930
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
1931
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1932 1933 1934 1935 1936
    if (pBlock == NULL) {
      break;
    }

    // the pDataBlock are always the same one, no need to call this again
1937
    setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
1938 1939
    blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);

1940 1941 1942
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
  }

1943 1944
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

1945 1946 1947
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;

1948
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1949
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1950
  while (1) {
1951
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1952
    doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1953

1954
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1955
    if (!hasRemain) {
H
Haojun Liao 已提交
1956
      setOperatorCompleted(pOperator);
1957 1958
      break;
    }
1959

1960 1961 1962 1963 1964
    if (pBInfo->pRes->info.rows > 0) {
      break;
    }
  }
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1965
  return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1966 1967
}

1968 1969
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
                                             SExecTaskInfo* pTaskInfo) {
1970 1971 1972 1973 1974 1975
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

1976 1977 1978
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
  SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;

1979 1980 1981
  if (pStateNode->window.pExprs != NULL) {
    int32_t    numOfScalarExpr = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
H
Hongze Cheng 已提交
1982
    int32_t    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
1983 1984 1985 1986 1987
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

1988
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
1989 1990 1991 1992 1993 1994 1995
  pInfo->stateKey.type = pInfo->stateCol.type;
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
  if (pInfo->stateKey.pData == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
1996 1997 1998 1999 2000
  int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2001 2002
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

2003 2004
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
2005
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
2006

H
Haojun Liao 已提交
2007
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
2008 2009 2010 2011
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2012
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
2013
  initBasicInfo(&pInfo->binfo, pResBlock);
2014
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2015

L
Liu Jicong 已提交
2016 2017
  pInfo->twAggSup =
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
2018

2019 2020
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

X
Xiaoyu Wang 已提交
2021
  pInfo->tsSlotId = tsSlotId;
2022

L
Liu Jicong 已提交
2023 2024
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2025
  pOperator->fpSet =
2026
      createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, optrDefaultBufFn, NULL);
2027

2028 2029 2030 2031 2032
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2033 2034
  return pOperator;

L
Liu Jicong 已提交
2035
_error:
H
Haojun Liao 已提交
2036 2037 2038 2039
  if (pInfo != NULL) {
    destroyStateWindowOperatorInfo(pInfo);
  }

2040 2041
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
2042 2043 2044
  return NULL;
}

2045
void destroySWindowOperatorInfo(void* param) {
2046
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
2047 2048 2049
  if (pInfo == NULL) {
    return;
  }
2050

2051
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
2052 2053 2054 2055
  colDataDestroy(&pInfo->twAggSup.timeWindowData);

  cleanupAggSup(&pInfo->aggSup);
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
2056
  taosMemoryFreeClear(param);
2057 2058
}

H
Haojun Liao 已提交
2059
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
2060
                                            SExecTaskInfo* pTaskInfo) {
2061 2062 2063 2064 2065 2066
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2067
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2068
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2069

2070
  int32_t      numOfCols = 0;
H
Haojun Liao 已提交
2071
  SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
2072
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
2073
  initBasicInfo(&pInfo->binfo, pResBlock);
H
Haojun Liao 已提交
2074

H
Haojun Liao 已提交
2075
  int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
2076 2077 2078 2079
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2080 2081 2082 2083
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
  pInfo->gap = pSessionNode->gap;

2084
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2085 2086
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

2087
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
2088 2089 2090
  pInfo->binfo.pRes = pResBlock;
  pInfo->winSup.prevTs = INT64_MIN;
  pInfo->reptScan = false;
H
Haojun Liao 已提交
2091 2092 2093 2094
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2095

L
Liu Jicong 已提交
2096 2097
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
2098
  pOperator->fpSet =
2099
      createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, optrDefaultBufFn, NULL);
2100 2101
  pOperator->pTaskInfo = pTaskInfo;
  code = appendDownstream(pOperator, &downstream, 1);
2102 2103 2104 2105
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2106 2107
  return pOperator;

L
Liu Jicong 已提交
2108
_error:
2109
  destroySWindowOperatorInfo(pInfo);
2110 2111 2112
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2113
}
5
54liuyao 已提交
2114

5
54liuyao 已提交
2115
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
2116
                      SExecTaskInfo* pTaskInfo, SColumnInfoData* pTimeWindowData) {
5
54liuyao 已提交
2117 2118
  for (int32_t k = 0; k < numOfOutput; ++k) {
    if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
2119 2120 2121 2122 2123
      if (!pTimeWindowData) {
        continue;
      }

      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pDestCtx[k]);
L
Liu Jicong 已提交
2124 2125
      char*                p = GET_ROWCELL_INTERBUF(pEntryInfo);
      SColumnInfoData      idata = {0};
2126 2127 2128 2129 2130 2131 2132 2133
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
      idata.pData = p;

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pDestCtx[k].sfp.process(&tw, 1, &out);
      pEntryInfo->numOfRes = 1;
L
Liu Jicong 已提交
2134
    } else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
2135
      int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
5
54liuyao 已提交
2136 2137 2138
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
        pTaskInfo->code = code;
2139
        T_LONG_JMP(pTaskInfo->env, code);
5
54liuyao 已提交
2140 2141 2142 2143 2144
      }
    }
  }
}

2145 2146
bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) {
  return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0);
2147 2148
}

5
54liuyao 已提交
2149
static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SHashObj* pUpdatedMap) {
2150 2151 2152 2153
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  int32_t                      size = taosArrayGetSize(pWinArray);
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
5
54liuyao 已提交
2154
  SExprSupp*                   pSup = &pOperator->exprSupp;
5
54liuyao 已提交
2155 2156 2157
  if (!pInfo->pChildren) {
    return;
  }
5
54liuyao 已提交
2158
  for (int32_t i = 0; i < size; i++) {
H
Haojun Liao 已提交
2159
    SWinKey*    pWinRes = taosArrayGet(pWinArray, i);
2160
    SResultRow* pCurResult = NULL;
2161
    STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval);
5
54liuyao 已提交
2162
    if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup)) {
2163 2164
      continue;
    }
2165

5
54liuyao 已提交
2166
    int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
2167
    int32_t num = 0;
5
54liuyao 已提交
2168
    for (int32_t j = 0; j < numOfChildren; j++) {
2169 2170 2171 2172
      SOperatorInfo*               pChildOp = taosArrayGetP(pInfo->pChildren, j);
      SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
      SExprSupp*                   pChildSup = &pChildOp->exprSupp;
      if (!hasIntervalWindow(pChInfo->pState, pWinRes)) {
2173 2174
        continue;
      }
2175 2176 2177 2178
      if (num == 0) {
        int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
                                    pSup->rowEntryInfoOffset, &pInfo->aggSup);
        if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
S
Shengliang Guan 已提交
2179
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
2180 2181
        }
      }
2182
      num++;
2183
      SResultRow* pChResult = NULL;
2184 2185
      setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
                   pChildSup->rowEntryInfoOffset, &pChInfo->aggSup);
2186 2187
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
      compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
5
54liuyao 已提交
2188
      releaseOutputBuf(pChInfo->pState, pWinRes, pChResult);
5
54liuyao 已提交
2189
    }
2190
    if (num > 0 && pUpdatedMap) {
2191 2192 2193
      saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pUpdatedMap);
      saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize);
      releaseOutputBuf(pInfo->pState, pWinRes, pCurResult);
2194
    }
5
54liuyao 已提交
2195 2196 2197 2198 2199
  }
}

bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
  SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
2200
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
L
Liu Jicong 已提交
2201
                                                               GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
5
54liuyao 已提交
2202 2203 2204
  return p1 == NULL;
}

2205
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
5
54liuyao 已提交
2206 2207
  if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
    SWinKey key = {.ts = pWin->skey, .groupId = groupId};
5
54liuyao 已提交
2208
    if (streamStateGet(pState, &key, NULL, 0) == TSDB_CODE_SUCCESS) {
5
54liuyao 已提交
2209 2210
      return false;
    }
2211
    return true;
5
54liuyao 已提交
2212 2213 2214 2215
  }
  return false;
}

L
Liu Jicong 已提交
2216 2217 2218 2219
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey,
                        STimeWindow* pNextWin) {
  int32_t forwardRows =
      getNumOfRowsInTimeWindow(pBlockInfo, tsCols, startPos, eKey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
5
54liuyao 已提交
2220 2221 2222 2223
  int32_t prevEndPos = forwardRows - 1 + startPos;
  return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
}

H
Haojun Liao 已提交
2224
void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
5
54liuyao 已提交
2225 2226 2227 2228
  SArray* childIds = taosArrayInit(8, sizeof(int32_t));
  for (int32_t i = 0; i < size; i++) {
    taosArrayPush(childIds, &i);
  }
H
Haojun Liao 已提交
2229
  taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*));
5
54liuyao 已提交
2230 2231 2232 2233
}

static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }

2234
static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) {
2235
  tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
5
54liuyao 已提交
2236
  clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
2237
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
2238
  pInfo->aggSup.currentPageId = -1;
2239
  streamStateClear(pInfo->pState);
5
54liuyao 已提交
2240 2241
}

5
54liuyao 已提交
2242 2243 2244 2245
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
  if (pBlock->info.rows <= 0) {
    return;
  }
5
54liuyao 已提交
2246 2247 2248
  blockDataCleanup(pBlock);
}

5
54liuyao 已提交
2249 2250 2251 2252 2253 2254
static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) {
  clearSpecialDataBlock(pBlock);
  int32_t size = taosArrayGetSize(array);
  if (size - (*pIndex) == 0) {
    return;
  }
L
Liu Jicong 已提交
2255
  blockDataEnsureCapacity(pBlock, size - (*pIndex));
2256
  ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock));
2257 2258 2259 2260 2261
  SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
5
54liuyao 已提交
2262
  for (; (*pIndex) < size; (*pIndex)++) {
L
Liu Jicong 已提交
2263
    SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex));
5
54liuyao 已提交
2264 2265 2266
    colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
    colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
    colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
5
54liuyao 已提交
2267 2268
    colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->calWin.skey, false);
    colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false);
5
54liuyao 已提交
2269 2270 2271 2272 2273 2274 2275 2276 2277
    pBlock->info.rows++;
  }
  if ((*pIndex) == size) {
    *pIndex = 0;
    taosArrayClear(array);
  }
  blockDataUpdateTsWindow(pBlock, 0);
}

5
54liuyao 已提交
2278
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
5
54liuyao 已提交
2279
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
2280
  TSKEY*           tsData = (TSKEY*)pStartCol->pData;
5
54liuyao 已提交
2281 2282
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           tsEndData = (TSKEY*)pEndCol->pData;
5
54liuyao 已提交
2283
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
L
Liu Jicong 已提交
2284 2285
  uint64_t*        groupIdData = (uint64_t*)pGroupCol->pData;
  int32_t          chId = getChildIndex(pBlock);
5
54liuyao 已提交
2286
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
5
54liuyao 已提交
2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301
    TSKEY winTs = tsData[i];
    while (winTs < tsEndData[i]) {
      SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
      void*   chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
      if (chIds) {
        SArray* chArray = *(SArray**)chIds;
        int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
        if (index != -1) {
          qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
          taosArrayRemove(chArray, index);
          if (taosArrayGetSize(chArray) == 0) {
            // pull data is over
            taosArrayDestroy(chArray);
            taosHashRemove(pMap, &winRes, sizeof(SWinKey));
          }
5
54liuyao 已提交
2302 2303
        }
      }
5
54liuyao 已提交
2304
      winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
5
54liuyao 已提交
2305 2306 2307
    }
  }
}
5
54liuyao 已提交
2308

2309
static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
2310 2311
  int32_t size = taosArrayGetSize(wins);
  for (int32_t i = 0; i < size; i++) {
L
Liu Jicong 已提交
2312
    SWinKey*    winKey = taosArrayGet(wins, i);
2313
    STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval);
2314
    if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
2315 2316
      void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
      if (!chIds) {
5
54liuyao 已提交
2317
        SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
2318
        // add pull data request
5
54liuyao 已提交
2319 2320 2321 2322 2323
        if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
          int32_t size1 = taosArrayGetSize(pInfo->pChildren);
          addPullWindow(pInfo->pPullDataMap, winKey, size1);
          qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
        }
2324 2325 2326 2327 2328
      }
    }
  }
}

5
54liuyao 已提交
2329 2330 2331 2332 2333 2334
static void clearFunctionContext(SExprSupp* pSup) {
  for (int32_t i = 0; i < pSup->numOfExprs; i++) {
    pSup->pCtx[i].saveHandle.currentPage = -1;
  }
}

2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345
void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
2346
  pBlock->info.id.groupId = 0;
2347
  buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362
}

static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
                                    SHashObj* pUpdatedMap) {
  SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;

  SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
  SExecTaskInfo*  pTaskInfo = pOperatorInfo->pTaskInfo;
  SExprSupp*      pSup = &pOperatorInfo->exprSupp;
  int32_t         numOfOutput = pSup->numOfExprs;
  int32_t         step = 1;
  TSKEY*          tsCols = NULL;
  SResultRow*     pResult = NULL;
  int32_t         forwardRows = 0;

2363
  ASSERT(pSDataBlock->pDataBlock != NULL);
2364 2365 2366 2367 2368
  SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
  tsCols = (int64_t*)pColDataInfo->pData;

  int32_t     startPos = 0;
  TSKEY       ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
5
54liuyao 已提交
2369 2370 2371 2372 2373 2374
  STimeWindow nextWin = {0};
  if (IS_FINAL_OP(pInfo)) {
    nextWin = getFinalTimeWindow(ts, &pInfo->interval);
  } else {
    nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
  }
2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392
  while (1) {
    bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
    if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
      startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
      if (startPos < 0) {
        break;
      }
      continue;
    }

    if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) {
      bool    ignore = true;
      SWinKey winRes = {
          .ts = nextWin.skey,
          .groupId = groupId,
      };
      void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
      if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) {
5
54liuyao 已提交
2393
        SPullWindowInfo pull = {.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
2394
        // add pull data request
5
54liuyao 已提交
2395 2396 2397 2398 2399
        if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
          int32_t size = taosArrayGetSize(pInfo->pChildren);
          addPullWindow(pInfo->pPullDataMap, &winRes, size);
          qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
        }
2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425
      } else {
        int32_t index = -1;
        SArray* chArray = NULL;
        int32_t chId = 0;
        if (chIds) {
          chArray = *(void**)chIds;
          chId = getChildIndex(pSDataBlock);
          index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
        }
        if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
          ignore = false;
        }
      }

      if (ignore) {
        startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
        if (startPos < 0) {
          break;
        }
        continue;
      }
    }

    int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput,
                                pSup->rowEntryInfoOffset, &pInfo->aggSup);
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
2426
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
2427 2428
    }

5
54liuyao 已提交
2429 2430 2431 2432 2433 2434
    if (IS_FINAL_OP(pInfo)) {
      forwardRows = 1;
    } else {
      forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
                                             NULL, TSDB_ORDER_ASC);
    }
2435 2436 2437
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
      saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap);
    }
5
54liuyao 已提交
2438 2439 2440 2441 2442 2443 2444 2445

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
      SWinKey key = {
          .ts = pResult->win.skey,
          .groupId = groupId,
      };
      tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
    }
2446
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
H
Haojun Liao 已提交
2447
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458
                     pSDataBlock->info.rows, numOfOutput);
    SWinKey key = {
        .ts = nextWin.skey,
        .groupId = groupId,
    };
    saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize);
    releaseOutputBuf(pInfo->pState, &key, pResult);
    if (pInfo->delKey.ts > key.ts) {
      pInfo->delKey = key;
    }
    int32_t prevEndPos = (forwardRows - 1) * step + startPos;
2459
    ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
2460 2461 2462 2463 2464 2465 2466 2467
    startPos =
        getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
    if (startPos < 0) {
      break;
    }
  }
}

5
54liuyao 已提交
2468
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
2469
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2470
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
2471 2472 2473

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  TSKEY          maxTs = INT64_MIN;
5
54liuyao 已提交
2474
  TSKEY          minTs = INT64_MAX;
5
54liuyao 已提交
2475

2476 2477
  SExprSupp* pSup = &pOperator->exprSupp;

5
54liuyao 已提交
2478
  qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
L
Liu Jicong 已提交
2479

5
54liuyao 已提交
2480 2481 2482
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  } else if (pOperator->status == OP_RES_TO_RETURN) {
5
54liuyao 已提交
2483 2484 2485
    doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
    if (pInfo->pPullDataRes->info.rows != 0) {
      // process the rest of the data
2486
      ASSERT(IS_FINAL_OP(pInfo));
5
54liuyao 已提交
2487
      printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2488 2489 2490
      return pInfo->pPullDataRes;
    }

2491
    doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
2492 2493 2494 2495 2496 2497
    if (pInfo->pDelRes->info.rows != 0) {
      // process the rest of the data
      printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
      return pInfo->pDelRes;
    }

2498
    doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2499 2500 2501
    if (pInfo->binfo.pRes->info.rows != 0) {
      printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
      return pInfo->binfo.pRes;
5
54liuyao 已提交
2502
    }
5
54liuyao 已提交
2503

H
Haojun Liao 已提交
2504
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
2505 2506 2507 2508 2509 2510
    if (!IS_FINAL_OP(pInfo)) {
      clearFunctionContext(&pOperator->exprSupp);
      // semi interval operator clear disk buffer
      clearStreamIntervalOperator(pInfo);
      qDebug("===stream===clear semi operator");
    } else {
2511 2512
      deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
                            &pInfo->interval, &pInfo->delKey);
L
Liu Jicong 已提交
2513
      streamStateCommit(pTaskInfo->streamInfo.pState);
5
54liuyao 已提交
2514 2515
    }
    return NULL;
5
54liuyao 已提交
2516
  } else {
5
54liuyao 已提交
2517
    if (!IS_FINAL_OP(pInfo)) {
2518
      doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
2519 2520 2521 2522 2523 2524
      if (pInfo->pDelRes->info.rows != 0) {
        // process the rest of the data
        printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
        return pInfo->pDelRes;
      }

2525
      doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2526
      if (pInfo->binfo.pRes->info.rows != 0) {
5
54liuyao 已提交
2527
        printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2528 2529
        return pInfo->binfo.pRes;
      }
5
54liuyao 已提交
2530
    }
5
54liuyao 已提交
2531 2532
  }

5
54liuyao 已提交
2533 2534 2535
  SArray*    pUpdated = taosArrayInit(4, POINTER_BYTES);
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  SHashObj*  pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
2536 2537 2538
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
2539
      pOperator->status = OP_RES_TO_RETURN;
5
54liuyao 已提交
2540 2541
      qDebug("===stream===return data:%s. recv datablock num:%" PRIu64 , IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
      pInfo->numOfDatapack = 0;
5
54liuyao 已提交
2542 2543
      break;
    }
5
54liuyao 已提交
2544
    pInfo->numOfDatapack++;
5
54liuyao 已提交
2545
    printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
2546

2547
    ASSERT(pBlock->info.type != STREAM_INVERT);
H
Haojun Liao 已提交
2548
    if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
5
54liuyao 已提交
2549
      pInfo->binfo.pRes->info.type = pBlock->info.type;
2550 2551
    } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
               pBlock->info.type == STREAM_CLEAR) {
2552
      SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
2553
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pUpdatedMap);
2554
      if (IS_FINAL_OP(pInfo)) {
2555 2556 2557 2558
        int32_t                      childIndex = getChildIndex(pBlock);
        SOperatorInfo*               pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
        SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
        SExprSupp*                   pChildSup = &pChildOp->exprSupp;
2559
        doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL);
5
54liuyao 已提交
2560
        rebuildIntervalWindow(pOperator, delWins, pUpdatedMap);
2561 2562 2563
        addRetriveWindow(delWins, pInfo);
        taosArrayAddAll(pInfo->pDelWins, delWins);
        taosArrayDestroy(delWins);
2564 2565
        continue;
      }
2566 2567 2568
      removeResults(delWins, pUpdatedMap);
      taosArrayAddAll(pInfo->pDelWins, delWins);
      taosArrayDestroy(delWins);
2569
      break;
5
54liuyao 已提交
2570
    } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
5
54liuyao 已提交
2571
      getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
5
54liuyao 已提交
2572
      continue;
5
54liuyao 已提交
2573
    } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
2574
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap);
5
54liuyao 已提交
2575 2576 2577 2578
      if (taosArrayGetSize(pUpdated) > 0) {
        break;
      }
      continue;
L
Liu Jicong 已提交
2579
    } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
5
54liuyao 已提交
2580
      processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
5
54liuyao 已提交
2581
      continue;
5
54liuyao 已提交
2582
    }
5
54liuyao 已提交
2583

5
54liuyao 已提交
2584 2585 2586 2587
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
2588
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
H
Haojun Liao 已提交
2589
    doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap);
5
54liuyao 已提交
2590
    if (IS_FINAL_OP(pInfo)) {
S
shenglian zhou 已提交
2591
      int32_t chIndex = getChildIndex(pBlock);
5
54liuyao 已提交
2592 2593 2594 2595 2596
      int32_t size = taosArrayGetSize(pInfo->pChildren);
      // if chIndex + 1 - size > 0, add new child
      for (int32_t i = 0; i < chIndex + 1 - size; i++) {
        SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
        if (!pChildOp) {
S
Shengliang Guan 已提交
2597
          T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
2598
        }
2599
        SStreamIntervalOperatorInfo* pTmpInfo = pChildOp->info;
2600
        pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
5
54liuyao 已提交
2601
        taosArrayPush(pInfo->pChildren, &pChildOp);
5
54liuyao 已提交
2602
        qDebug("===stream===add child, id:%d", chIndex);
5
54liuyao 已提交
2603
      }
2604 2605
      SOperatorInfo*               pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
      SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
2606
      setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
H
Haojun Liao 已提交
2607
      doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.id.groupId, NULL);
5
54liuyao 已提交
2608
    }
5
54liuyao 已提交
2609 2610 2611
    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
    maxTs = TMAX(maxTs, pBlock->info.watermark);
    minTs = TMIN(minTs, pBlock->info.window.skey);
5
54liuyao 已提交
2612
  }
S
shenglian zhou 已提交
2613

2614
  removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
5
54liuyao 已提交
2615
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
2616
  pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
5
54liuyao 已提交
2617
  if (IS_FINAL_OP(pInfo)) {
2618
    closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
2619
                              pInfo->pPullDataMap, pUpdatedMap, pInfo->pDelWins, pOperator);
2620
    closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
5
54liuyao 已提交
2621
  }
2622
  pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
5
54liuyao 已提交
2623

5
54liuyao 已提交
2624 2625 2626 2627 2628 2629 2630
  void* pIte = NULL;
  while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
    taosArrayPush(pUpdated, pIte);
  }
  taosHashCleanup(pUpdatedMap);
  taosArraySort(pUpdated, resultrowComparAsc);

5
54liuyao 已提交
2631 2632
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
2633 2634 2635 2636

  doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
  if (pInfo->pPullDataRes->info.rows != 0) {
    // process the rest of the data
2637
    ASSERT(IS_FINAL_OP(pInfo));
5
54liuyao 已提交
2638
    printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2639 2640 2641
    return pInfo->pPullDataRes;
  }

2642
  doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
5
54liuyao 已提交
2643 2644 2645 2646 2647 2648
  if (pInfo->pDelRes->info.rows != 0) {
    // process the rest of the data
    printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
    return pInfo->pDelRes;
  }

2649
  doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2650
  if (pInfo->binfo.pRes->info.rows != 0) {
5
54liuyao 已提交
2651
    printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2652 2653 2654 2655 2656 2657
    return pInfo->binfo.pRes;
  }

  return NULL;
}

5
54liuyao 已提交
2658 2659 2660 2661 2662 2663 2664 2665 2666
int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
  if (pIntervalPhyNode->window.deleteMark <= 0) {
    return DEAULT_DELETE_MARK;
  }
  int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark,pIntervalPhyNode->window.watermark);
  deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
  return deleteMark;
}

S
shenglian zhou 已提交
2667 2668
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                     SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
2669 2670 2671
  SIntervalPhysiNode*          pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
  SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5
54liuyao 已提交
2672 2673 2674
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2675

2676
  pOperator->pTaskInfo = pTaskInfo;
S
shenglian zhou 已提交
2677 2678 2679 2680 2681 2682 2683 2684
  pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
                                .sliding = pIntervalPhyNode->sliding,
                                .intervalUnit = pIntervalPhyNode->intervalUnit,
                                .slidingUnit = pIntervalPhyNode->slidingUnit,
                                .offset = pIntervalPhyNode->offset,
                                .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pIntervalPhyNode->window.watermark,
5
54liuyao 已提交
2685 2686
      .calTrigger = pIntervalPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
2687
      .minTs = INT64_MAX,
5
54liuyao 已提交
2688
      .deleteMark = getDeleteMark(pIntervalPhyNode),
L
Liu Jicong 已提交
2689 2690
      .deleteMarkSaved = 0,
      .calTriggerSaved = 0,
S
shenglian zhou 已提交
2691
  };
2692
  ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
5
54liuyao 已提交
2693 2694
  pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2695
  initResultSizeInfo(&pOperator->resultInfo, 4096);
5
54liuyao 已提交
2696 2697 2698 2699 2700 2701 2702 2703 2704
  if (pIntervalPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
    int32_t    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

S
shenglian zhou 已提交
2705 2706
  int32_t      numOfCols = 0;
  SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
2707
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
5
54liuyao 已提交
2708
  initBasicInfo(&pInfo->binfo, pResBlock);
2709

H
Haojun Liao 已提交
2710
  int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
2711 2712 2713 2714
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2715
  initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
2716

2717
  ASSERT(numOfCols > 0);
5
54liuyao 已提交
2718
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2719

2720 2721 2722 2723
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
  streamStateSetNumber(pInfo->pState, -1);

2724
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
5
54liuyao 已提交
2725 2726
  pInfo->pChildren = NULL;
  if (numOfChild > 0) {
2727
    pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
5
54liuyao 已提交
2728 2729 2730
    for (int32_t i = 0; i < numOfChild; i++) {
      SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
      if (pChildOp) {
2731
        SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
2732
        pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
5
54liuyao 已提交
2733
        taosArrayPush(pInfo->pChildren, &pChildOp);
2734
        streamStateSetNumber(pChInfo->pState, i);
5
54liuyao 已提交
2735 2736 2737 2738 2739
        continue;
      }
      goto _error;
    }
  }
5
54liuyao 已提交
2740

2741
  pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
5
54liuyao 已提交
2742

5
54liuyao 已提交
2743 2744 2745 2746
  if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
    pInfo->isFinal = true;
    pOperator->name = "StreamFinalIntervalOperator";
  } else {
5
54liuyao 已提交
2747
    // semi interval operator does not catch result
5
54liuyao 已提交
2748 2749
    pInfo->isFinal = false;
    pOperator->name = "StreamSemiIntervalOperator";
2750
    ASSERT(pInfo->aggSup.currentPageId == -1);
5
54liuyao 已提交
2751 2752
  }

5
54liuyao 已提交
2753
  if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
5
54liuyao 已提交
2754 2755
    pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
  }
5
54liuyao 已提交
2756 2757 2758 2759
  pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
  pInfo->pullIndex = 0;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
2760
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
5
54liuyao 已提交
2761
  pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
2762
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
2763
  pInfo->delIndex = 0;
H
Haojun Liao 已提交
2764
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
2765 2766
  pInfo->delKey.ts = INT64_MAX;
  pInfo->delKey.groupId = 0;
5
54liuyao 已提交
2767
  pInfo->numOfDatapack = 0;
5
54liuyao 已提交
2768

5
54liuyao 已提交
2769
  pOperator->operatorType = pPhyNode->type;
5
54liuyao 已提交
2770 2771 2772 2773
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;

S
shenglian zhou 已提交
2774
  pOperator->fpSet =
2775
      createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
2776
  if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
5
54liuyao 已提交
2777
    initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
2778
  }
5
54liuyao 已提交
2779 2780 2781 2782 2783 2784 2785 2786
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
2787
  destroyStreamFinalIntervalOperatorInfo(pInfo);
5
54liuyao 已提交
2788 2789 2790
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
5
54liuyao 已提交
2791
}
5
54liuyao 已提交
2792 2793

void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
5
54liuyao 已提交
2794
  tSimpleHashCleanup(pSup->pResultRows);
5
54liuyao 已提交
2795 2796
  destroyDiskbasedBuf(pSup->pResultBuf);
  blockDataDestroy(pSup->pScanBlock);
5
54liuyao 已提交
2797 2798
  taosMemoryFreeClear(pSup->pState);
  taosMemoryFreeClear(pSup->pDummyCtx);
5
54liuyao 已提交
2799 2800
}

2801
void destroyStreamSessionAggOperatorInfo(void* param) {
5
54liuyao 已提交
2802
  SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
2803
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
2804
  destroyStreamAggSupporter(&pInfo->streamAggSup);
5
54liuyao 已提交
2805

2806 2807 2808
  if (pInfo->pChildren != NULL) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
2809 2810
      SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
      destroyOperatorInfo(pChild);
2811
    }
5
54liuyao 已提交
2812
    taosArrayDestroy(pInfo->pChildren);
2813
  }
5
54liuyao 已提交
2814 2815 2816 2817
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  blockDataDestroy(pInfo->pDelRes);
  blockDataDestroy(pInfo->pWinBlock);
  blockDataDestroy(pInfo->pUpdateRes);
5
54liuyao 已提交
2818
  tSimpleHashCleanup(pInfo->pStDeleted);
2819

D
dapan1121 已提交
2820
  taosMemoryFreeClear(param);
5
54liuyao 已提交
2821 2822
}

2823 2824
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
                        SSDataBlock* pResultBlock) {
H
Haojun Liao 已提交
2825
  initBasicInfo(pBasicInfo, pResultBlock);
2826 2827 2828 2829
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
2830

H
Haojun Liao 已提交
2831
  initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
5
54liuyao 已提交
2832
  for (int32_t i = 0; i < numOfCols; ++i) {
2833
    pSup->pCtx[i].saveHandle.pBuf = NULL;
5
54liuyao 已提交
2834
  }
2835

2836
  ASSERT(numOfCols > 0);
5
54liuyao 已提交
2837 2838 2839 2840 2841 2842 2843 2844
  return TSDB_CODE_SUCCESS;
}

void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t nums) {
  for (int i = 0; i < nums; i++) {
    pDummy[i].functionId = pCtx[i].functionId;
  }
}
5
54liuyao 已提交
2845

5
54liuyao 已提交
2846 2847
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
                    STimeWindowAggSupp* pTwSup) {
2848 2849 2850 2851 2852 2853
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
    SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
    pScanInfo->tsColIndex = tsColIndex;
  }

  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
5
54liuyao 已提交
2854
    initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup);
2855 2856
    return;
  }
2857
  SStreamScanInfo* pScanInfo = downstream->info;
5
54liuyao 已提交
2858
  pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
5
54liuyao 已提交
2859
  if (!pScanInfo->pUpdateInfo) {
5
54liuyao 已提交
2860
    pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
5
54liuyao 已提交
2861
  }
5
54liuyao 已提交
2862
  pScanInfo->twAggSup = *pTwSup;
5
54liuyao 已提交
2863 2864
}

5
54liuyao 已提交
2865 2866 2867 2868 2869 2870 2871 2872 2873 2874
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
                               SStreamState* pState, int32_t keySize, int16_t keyType) {
  pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
  pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
  pSup->gap = gap;
  pSup->stateKeySize = keySize;
  pSup->stateKeyType = keyType;
  pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
  if (pSup->pDummyCtx == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
2875
  }
H
Haojun Liao 已提交
2876

5
54liuyao 已提交
2877 2878 2879 2880
  initDummyFunction(pSup->pDummyCtx, pCtx, numOfOutput);
  pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pSup->pState) = *pState;
  streamStateSetNumber(pSup->pState, -1);
2881

5
54liuyao 已提交
2882 2883
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = tSimpleHashInit(32, hashFn);
X
Xiaoyu Wang 已提交
2884

5
54liuyao 已提交
2885 2886 2887
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
5
54liuyao 已提交
2888
  }
5
54liuyao 已提交
2889 2890 2891 2892
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
5
54liuyao 已提交
2893
  }
5
54liuyao 已提交
2894 2895 2896 2897
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s", terrstr(terrno));
    return terrno;
5
54liuyao 已提交
2898
  }
5
54liuyao 已提交
2899 2900 2901
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
  for (int32_t i = 0; i < numOfOutput; ++i) {
    pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
5
54liuyao 已提交
2902 2903
  }

5
54liuyao 已提交
2904
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
2905
}
5
54liuyao 已提交
2906 2907

bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
5
54liuyao 已提交
2908
  if (ts + gap >= pWin->skey && ts - gap <= pWin->ekey) {
5
54liuyao 已提交
2909 2910 2911 2912 2913
    return true;
  }
  return false;
}

5
54liuyao 已提交
2914 2915
bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) {
  return isInTimeWindow(&pWinInfo->sessionWin.win, ts, gap);
5
54liuyao 已提交
2916 2917
}

5
54liuyao 已提交
2918 2919 2920 2921 2922
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
                         SSessionKey* pKey) {
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
2923
  int32_t code = streamStateSessionGetKeyByRange(pAggSup->pState, pKey, pKey);
5
54liuyao 已提交
2924 2925
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
2926 2927 2928
  }
}

5
54liuyao 已提交
2929
bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->sessionWin.win.skey == 0; }
5
54liuyao 已提交
2930

5
54liuyao 已提交
2931 2932 2933
void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
                         SResultWindowInfo* pCurWin) {
  pCurWin->sessionWin.groupId = groupId;
2934 2935
  pCurWin->sessionWin.win.skey = startTs;
  pCurWin->sessionWin.win.ekey = endTs;
5
54liuyao 已提交
2936
  int32_t size = pAggSup->resultRowSize;
2937 2938
  int32_t code =
      streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, pAggSup->gap, &pCurWin->pOutputBuf, &size);
5
54liuyao 已提交
2939 2940 2941 2942 2943
  if (code == TSDB_CODE_SUCCESS) {
    pCurWin->isOutput = true;
  } else {
    pCurWin->sessionWin.win.skey = startTs;
    pCurWin->sessionWin.win.ekey = endTs;
5
54liuyao 已提交
2944
  }
5
54liuyao 已提交
2945
}
5
54liuyao 已提交
2946

5
54liuyao 已提交
2947 2948
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
  int32_t size = 0;
2949
  int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
5
54liuyao 已提交
2950 2951
  if (code != TSDB_CODE_SUCCESS) {
    return code;
5
54liuyao 已提交
2952
  }
5
54liuyao 已提交
2953 2954 2955 2956 2957 2958
  streamStateCurNext(pAggSup->pState, pCur);
  return TSDB_CODE_SUCCESS;
}
void saveDeleteInfo(SArray* pWins, SSessionKey key) {
  // key.win.ekey = key.win.skey;
  taosArrayPush(pWins, &key);
5
54liuyao 已提交
2959 2960
}

5
54liuyao 已提交
2961 2962 2963 2964
void saveDeleteRes(SSHashObj* pStDelete, SSessionKey key) {
  key.win.ekey = key.win.skey;
  tSimpleHashPut(pStDelete, &key, sizeof(SSessionKey), NULL, 0);
}
2965

5
54liuyao 已提交
2966 2967 2968 2969 2970
static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) {
  key.win.ekey = key.win.skey;
  tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
  tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
}
5
54liuyao 已提交
2971

5
54liuyao 已提交
2972 2973 2974 2975 2976
static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
  *pHashKey = *pKey;
  pHashKey->win.ekey = pKey->win.skey;
}

5
54liuyao 已提交
2977 2978 2979
static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
  if (tSimpleHashGetSize(pHashMap) == 0) {
    return;
5
54liuyao 已提交
2980
  }
5
54liuyao 已提交
2981 2982 2983 2984
  int32_t size = taosArrayGetSize(pWins);
  for (int32_t i = 0; i < size; i++) {
    SSessionKey* pWin = taosArrayGet(pWins, i);
    if (!pWin) continue;
5
54liuyao 已提交
2985 2986
    SSessionKey key = {0};
    getSessionHashKey(pWin, &key);
5
54liuyao 已提交
2987
    tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
5
54liuyao 已提交
2988 2989 2990
  }
}

dengyihao's avatar
dengyihao 已提交
2991
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,
5
54liuyao 已提交
2992 2993
                                int32_t rows, int32_t start, int64_t gap, SSHashObj* pResultRows, SSHashObj* pStUpdated,
                                SSHashObj* pStDeleted) {
5
54liuyao 已提交
2994
  for (int32_t i = start; i < rows; ++i) {
2995
    if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
5
54liuyao 已提交
2996 2997
      return i - start;
    }
5
54liuyao 已提交
2998
    if (pWinInfo->sessionWin.win.skey > pStartTs[i]) {
5
54liuyao 已提交
2999
      if (pStDeleted && pWinInfo->isOutput) {
5
54liuyao 已提交
3000
        saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
5
54liuyao 已提交
3001
      }
5
54liuyao 已提交
3002 3003
      removeSessionResult(pStUpdated, pResultRows, pWinInfo->sessionWin);
      pWinInfo->sessionWin.win.skey = pStartTs[i];
5
54liuyao 已提交
3004
    }
5
54liuyao 已提交
3005
    pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]);
5
54liuyao 已提交
3006
    if (pEndTs) {
5
54liuyao 已提交
3007
      pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pEndTs[i]);
5
54liuyao 已提交
3008 3009 3010 3011 3012
    }
  }
  return rows - start;
}

5
54liuyao 已提交
3013 3014
static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
                                    int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
3015
  ASSERT(pWinInfo->sessionWin.win.skey <= pWinInfo->sessionWin.win.ekey);
5
54liuyao 已提交
3016
  *pResult = (SResultRow*)pWinInfo->pOutputBuf;
5
54liuyao 已提交
3017
  // set time window for current result
5
54liuyao 已提交
3018
  (*pResult)->win = pWinInfo->sessionWin.win;
3019
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
5
54liuyao 已提交
3020 3021 3022
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3023 3024 3025
static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
                                  int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
                                  SOperatorInfo* pOperator) {
3026
  SExprSupp*     pSup = &pOperator->exprSupp;
3027
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3028
  int32_t        code = initSessionOutputBuf(pCurWin, pResult, pSup->pCtx, numOutput, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
3029
  if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
S
Shengliang Guan 已提交
3030
    return TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
3031
  }
5
54liuyao 已提交
3032
  updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, false);
H
Haojun Liao 已提交
3033
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput);
5
54liuyao 已提交
3034 3035 3036
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3037 3038
static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
  streamStateSessionDel(pAggSup->pState, pKey);
5
54liuyao 已提交
3039 3040 3041
  SSessionKey hashKey = {0};
  getSessionHashKey(pKey, &hashKey);
  tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
5
54liuyao 已提交
3042 3043 3044 3045 3046 3047 3048 3049 3050 3051
  return true;
}

static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo) {
  void* pVal = tSimpleHashGet(pStUpdated, &pWinInfo->sessionWin, sizeof(SSessionKey));
  if (pVal) {
    SResultWindowInfo* pWin = pVal;
    pWinInfo->isOutput = pWin->isOutput;
  }
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3052 3053
}

5
54liuyao 已提交
3054 3055 3056 3057 3058 3059 3060
SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
                                       SResultWindowInfo* pNextWin) {
  SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->sessionWin);
  pNextWin->isOutput = true;
  setSessionWinOutputInfo(pStUpdated, pNextWin);
  int32_t size = 0;
  pNextWin->sessionWin = pCurWin->sessionWin;
3061
  int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
5
54liuyao 已提交
3062 3063 3064 3065
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_INVALID(*pNextWin);
  }
  return pCur;
5
54liuyao 已提交
3066 3067
}

5
54liuyao 已提交
3068 3069 3070 3071 3072 3073 3074 3075 3076
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
                                 SSHashObj* pStDeleted) {
  SExprSupp*                     pSup = &pOperator->exprSupp;
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SResultRow*                    pCurResult = NULL;
  int32_t                        numOfOutput = pOperator->exprSupp.numOfExprs;
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
  initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
3077
  // Just look for the window behind StartIndex
5
54liuyao 已提交
3078 3079 3080 3081 3082 3083
  while (1) {
    SResultWindowInfo winInfo = {0};
    SStreamStateCur*  pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo);
    if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap)) {
      streamStateFreeCur(pCur);
      break;
5
54liuyao 已提交
3084
    }
5
54liuyao 已提交
3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096
    SResultRow* pWinResult = NULL;
    initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
    pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true);
    compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
    tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey));
    if (winInfo.isOutput && pStDeleted) {
      saveDeleteRes(pStDeleted, winInfo.sessionWin);
    }
    removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin);
    doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
    streamStateFreeCur(pCur);
5
54liuyao 已提交
3097 3098 3099
  }
}

5
54liuyao 已提交
3100 3101 3102
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
  saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize);
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3103 3104
}

5
54liuyao 已提交
3105 3106
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
                                   SSHashObj* pStDeleted, bool hasEndTs) {
X
Xiaoyu Wang 已提交
3107
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3108
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
3109
  int32_t                        numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
3110
  uint64_t                       groupId = pSDataBlock->info.id.groupId;
X
Xiaoyu Wang 已提交
3111
  int64_t                        code = TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3112 3113 3114
  SResultRow*                    pResult = NULL;
  int32_t                        rows = pSDataBlock->info.rows;
  int32_t                        winRows = 0;
X
Xiaoyu Wang 已提交
3115

5
54liuyao 已提交
3116
  SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
3117
  TSKEY*           startTsCols = (int64_t*)pStartTsCol->pData;
5
54liuyao 已提交
3118 3119 3120
  SColumnInfoData* pEndTsCol = NULL;
  if (hasEndTs) {
    pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->endTsIndex);
5
54liuyao 已提交
3121
  } else {
5
54liuyao 已提交
3122
    pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
3123
  }
X
Xiaoyu Wang 已提交
3124

5
54liuyao 已提交
3125
  TSKEY*               endTsCols = (int64_t*)pEndTsCol->pData;
5
54liuyao 已提交
3126
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3127
  for (int32_t i = 0; i < rows;) {
5
54liuyao 已提交
3128
    if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
5
54liuyao 已提交
3129 3130 3131
      i++;
      continue;
    }
5
54liuyao 已提交
3132 3133 3134 3135 3136
    SResultWindowInfo winInfo = {0};
    setSessionOutputBuf(pAggSup, startTsCols[i], endTsCols[i], groupId, &winInfo);
    setSessionWinOutputInfo(pStUpdated, &winInfo);
    winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
                                      pAggSup->pResultRows, pStUpdated, pStDeleted);
5
54liuyao 已提交
3137 3138
    // coverity scan error
    if (!winInfo.pOutputBuf) {
S
Shengliang Guan 已提交
3139
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3140
    }
L
Liu Jicong 已提交
3141

5
54liuyao 已提交
3142 3143
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
                              pOperator);
5
54liuyao 已提交
3144
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
3145
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3146
    }
5
54liuyao 已提交
3147 3148
    compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted);
    saveSessionOutputBuf(pAggSup, &winInfo);
5
54liuyao 已提交
3149 3150

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
5
54liuyao 已提交
3151
      code = saveResult(winInfo, pStUpdated);
5
54liuyao 已提交
3152
      if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
3153
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3154
      }
5
54liuyao 已提交
3155
    }
5
54liuyao 已提交
3156
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
3157 3158
      SSessionKey key = {0};
      getSessionHashKey(&winInfo.sessionWin, &key);
5
54liuyao 已提交
3159 3160 3161
      tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
    }

5
54liuyao 已提交
3162 3163 3164 3165
    i += winRows;
  }
}

5
54liuyao 已提交
3166
void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) {
3167
  ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos));
5
54liuyao 已提交
3168 3169 3170 3171
  if (fp) {
    void* ptr = taosArrayGet(pWinInfos, index);
    fp(ptr);
  }
5
54liuyao 已提交
3172 3173 3174
  taosArrayRemove(pWinInfos, index);
}

5
54liuyao 已提交
3175
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
5
54liuyao 已提交
3176
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
3177
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
5
54liuyao 已提交
3178
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
3179
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
3180
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
3181
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
5
54liuyao 已提交
3182
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
5
54liuyao 已提交
3183 3184 3185 3186
    while (1) {
      SSessionKey curWin = {0};
      getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], &curWin);
      if (IS_INVALID_SESSION_WIN_KEY(curWin)) {
3187 3188
        break;
      }
5
54liuyao 已提交
3189 3190 3191 3192
      doDeleteSessionWindow(pAggSup, &curWin);
      if (result) {
        saveDeleteInfo(result, curWin);
      }
3193
    }
5
54liuyao 已提交
3194 3195 3196
  }
}

5
54liuyao 已提交
3197 3198 3199 3200 3201 3202 3203 3204
static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
  SSessionKey* pWin1 = (SSessionKey*)pKey1;
  SSessionKey* pWin2 = (SSessionKey*)pKey2;

  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
5
54liuyao 已提交
3205 3206
  }

5
54liuyao 已提交
3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221
  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
  void*   pIte = NULL;
  size_t  keyLen = 0;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
    void* key = tSimpleHashGetKey(pIte, &keyLen);
3222
    ASSERT(keyLen == sizeof(SSessionKey));
5
54liuyao 已提交
3223 3224 3225 3226 3227 3228
    taosArrayPush(pUpdated, key);
  }
  taosArraySort(pUpdated, sessionKeyCompareAsc);
  return TSDB_CODE_SUCCESS;
}

3229
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
5
54liuyao 已提交
3230 3231 3232 3233
  blockDataCleanup(pBlock);
  int32_t size = tSimpleHashGetSize(pStDeleted);
  if (size == 0) {
    return;
3234 3235
  }
  blockDataEnsureCapacity(pBlock, size);
5
54liuyao 已提交
3236 3237 3238 3239 3240 3241 3242
  size_t  keyLen = 0;
  int32_t iter = 0;
  while (((*Ite) = tSimpleHashIterate(pStDeleted, *Ite, &iter)) != NULL) {
    if (pBlock->info.rows + 1 > pBlock->info.capacity) {
      break;
    }
    SSessionKey*     res = tSimpleHashGetKey(*Ite, &keyLen);
3243
    SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
5
54liuyao 已提交
3244
    colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
3245
    SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
3246
    colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
3247 3248
    SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
    colDataAppendNULL(pUidCol, pBlock->info.rows);
5
54liuyao 已提交
3249 3250
    SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
    colDataAppend(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false);
3251 3252 3253 3254
    SColumnInfoData* pCalStCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
    colDataAppendNULL(pCalStCol, pBlock->info.rows);
    SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
    colDataAppendNULL(pCalEdCol, pBlock->info.rows);
3255 3256

    SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
3257 3258 3259

    void* tbname = NULL;
    streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname);
3260 3261 3262 3263 3264 3265
    if (tbname == NULL) {
      colDataAppendNULL(pTableCol, pBlock->info.rows);
    } else {
      char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
      STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
      colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
L
Liu Jicong 已提交
3266
      tdbFree(tbname);
3267
    }
5
54liuyao 已提交
3268 3269 3270
    pBlock->info.rows += 1;
  }
  if ((*Ite) == NULL) {
5
54liuyao 已提交
3271
    tSimpleHashClear(pStDeleted);
5
54liuyao 已提交
3272 3273 3274
  }
}

5
54liuyao 已提交
3275 3276 3277 3278 3279 3280 3281 3282
static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) {
  SExprSupp*                     pSup = &pOperator->exprSupp;
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
  int32_t                        size = taosArrayGetSize(pWinArray);
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
  int32_t                        numOfOutput = pSup->numOfExprs;
  int32_t                        numOfChildren = taosArrayGetSize(pInfo->pChildren);
3283
  ASSERT(pInfo->pChildren);
3284

3285
  for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3286 3287 3288
    SSessionKey*      pWinKey = taosArrayGet(pWinArray, i);
    int32_t           num = 0;
    SResultWindowInfo parentWin = {0};
3289
    for (int32_t j = 0; j < numOfChildren; j++) {
X
Xiaoyu Wang 已提交
3290
      SOperatorInfo*                 pChild = taosArrayGetP(pInfo->pChildren, j);
3291
      SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
5
54liuyao 已提交
3292
      SStreamAggSupporter*           pChAggSup = &pChInfo->streamAggSup;
5
54liuyao 已提交
3293 3294
      SSessionKey                    chWinKey = {0};
      getSessionHashKey(pWinKey, &chWinKey);
3295 3296 3297
      SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
      SResultRow*      pResult = NULL;
      SResultRow*      pChResult = NULL;
5
54liuyao 已提交
3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309
      while (1) {
        SResultWindowInfo childWin = {0};
        childWin.sessionWin = *pWinKey;
        int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
        if (code == TSDB_CODE_SUCCESS && pWinKey->win.skey <= childWin.sessionWin.win.skey &&
            childWin.sessionWin.win.ekey <= pWinKey->win.ekey) {
          if (num == 0) {
            setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
            code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
            if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
              break;
            }
3310
          }
5
54liuyao 已提交
3311 3312 3313 3314 3315 3316 3317 3318
          num++;
          updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, true);
          initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
                               pChild->exprSupp.rowEntryInfoOffset);
          compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
          compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL);
          saveResult(parentWin, pStUpdated);
        } else {
5
54liuyao 已提交
3319
          break;
3320 3321
        }
      }
5
54liuyao 已提交
3322 3323 3324 3325
      streamStateFreeCur(pCur);
    }
    if (num > 0) {
      saveSessionOutputBuf(pAggSup, &parentWin);
3326 3327 3328 3329
    }
  }
}

5
54liuyao 已提交
3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) {
  void*   pIte = NULL;
  size_t  keyLen = 0;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
    SResultWindowInfo* pWinInfo = pIte;
    if (isCloseWindow(&pWinInfo->sessionWin.win, pTwSup)) {
      if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) {
        int32_t code = saveResult(*pWinInfo, pClosed);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
5
54liuyao 已提交
3341 3342
        }
      }
3343 3344
      SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
      tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
5
54liuyao 已提交
3345 3346 3347 3348 3349
    }
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3350
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs) {
5
54liuyao 已提交
3351 3352 3353 3354 3355
  int32_t size = taosArrayGetSize(pChildren);
  for (int32_t i = 0; i < size; i++) {
    SOperatorInfo*                 pChildOp = taosArrayGetP(pChildren, i);
    SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
    pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
3356
    closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL);
5
54liuyao 已提交
3357 3358 3359
  }
}

5
54liuyao 已提交
3360 3361 3362 3363
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
3364
    SResultWindowInfo* pWinInfo = pIte;
5
54liuyao 已提交
3365
    saveResult(*pWinInfo, pStUpdated);
5
54liuyao 已提交
3366 3367 3368 3369
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3370
static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
5
54liuyao 已提交
3371 3372
  int32_t size = taosArrayGetSize(pResWins);
  for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3373 3374
    SSessionKey* pWinKey = taosArrayGet(pResWins, i);
    if (!pWinKey) continue;
5
54liuyao 已提交
3375 3376
    SSessionKey winInfo = {0};
    getSessionHashKey(pWinKey, &winInfo);
5
54liuyao 已提交
3377
    tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0);
3378 3379 3380
  }
}

5
54liuyao 已提交
3381 3382 3383
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
  pGroupResInfo->pRows = pArrayList;
  pGroupResInfo->index = 0;
3384
  ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
3385 3386
}

5
54liuyao 已提交
3387 3388 3389 3390 3391 3392 3393 3394 3395 3396
void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroupResInfo* pGroupResInfo,
                          SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    taosArrayDestroy(pGroupResInfo->pRows);
    pGroupResInfo->pRows = NULL;
3397 3398 3399
    return;
  }

5
54liuyao 已提交
3400
  // clear the existed group id
H
Haojun Liao 已提交
3401
  pBlock->info.id.groupId = 0;
3402
  buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
5
54liuyao 已提交
3403 3404
}

5
54liuyao 已提交
3405
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
5
54liuyao 已提交
3406
  SExprSupp*                     pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3407
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
3408
  SOptrBasicInfo*                pBInfo = &pInfo->binfo;
5
54liuyao 已提交
3409
  TSKEY                          maxTs = INT64_MIN;
5
54liuyao 已提交
3410
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3411 3412 3413
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  } else if (pOperator->status == OP_RES_TO_RETURN) {
3414
    doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3415
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3416
      printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
5
54liuyao 已提交
3417 3418
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3419 3420 3421 3422
    doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
    if (pBInfo->pRes->info.rows > 0) {
      printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
      return pBInfo->pRes;
5
54liuyao 已提交
3423
    }
5
54liuyao 已提交
3424

H
Haojun Liao 已提交
3425
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
3426
    return NULL;
5
54liuyao 已提交
3427 3428
  }

X
Xiaoyu Wang 已提交
3429
  _hash_fn_t     hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3430
  SSHashObj*     pStUpdated = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
3431
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3432
  SArray*        pUpdated = taosArrayInit(16, sizeof(SSessionKey));  // SResKeyPos
5
54liuyao 已提交
3433 3434 3435 3436 3437
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      break;
    }
5
54liuyao 已提交
3438
    printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
3439

5
54liuyao 已提交
3440 3441 3442
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3443
      // gap must be 0
5
54liuyao 已提交
3444 3445
      doDeleteTimeWindows(pAggSup, pBlock, pWins);
      removeSessionResults(pStUpdated, pWins);
5
54liuyao 已提交
3446 3447 3448 3449 3450
      if (IS_FINAL_OP(pInfo)) {
        int32_t                        childIndex = getChildIndex(pBlock);
        SOperatorInfo*                 pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
        SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
        // gap must be 0
5
54liuyao 已提交
3451 3452
        doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, NULL);
        rebuildSessionWindow(pOperator, pWins, pStUpdated);
5
54liuyao 已提交
3453 3454 3455 3456
      }
      copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
      taosArrayDestroy(pWins);
      continue;
3457
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3458
      getAllSessionWindow(pAggSup->pResultRows, pStUpdated);
5
54liuyao 已提交
3459
      continue;
5
54liuyao 已提交
3460
    }
5
54liuyao 已提交
3461

5
54liuyao 已提交
3462 3463 3464 3465
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
3466
    // the pDataBlock are always the same one, no need to call this again
3467
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3468 3469 3470 3471 3472 3473
    doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo));
    if (IS_FINAL_OP(pInfo)) {
      int32_t chIndex = getChildIndex(pBlock);
      int32_t size = taosArrayGetSize(pInfo->pChildren);
      // if chIndex + 1 - size > 0, add new child
      for (int32_t i = 0; i < chIndex + 1 - size; i++) {
3474 3475
        SOperatorInfo* pChildOp =
            createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
5
54liuyao 已提交
3476
        if (!pChildOp) {
S
Shengliang Guan 已提交
3477
          T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3478 3479 3480
        }
        taosArrayPush(pInfo->pChildren, &pChildOp);
      }
3481
      SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
3482
      setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3483
      doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
3484
    }
5
54liuyao 已提交
3485
    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
3486
    maxTs = TMAX(maxTs, pBlock->info.watermark);
5
54liuyao 已提交
3487
  }
5
54liuyao 已提交
3488 3489

  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
3490 3491
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
3492

5
54liuyao 已提交
3493 3494
  closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pStUpdated);
  closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
5
54liuyao 已提交
3495
  copyUpdateResult(pStUpdated, pUpdated);
5
54liuyao 已提交
3496 3497 3498
  removeSessionResults(pInfo->pStDeleted, pUpdated);
  tSimpleHashCleanup(pStUpdated);
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
5
54liuyao 已提交
3499
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
3500

3501 3502 3503 3504 3505 3506
#if 0
  char* pBuf = streamStateSessionDump(pAggSup->pState);
  qDebug("===stream===final session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

3507
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3508
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3509
    printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
5
54liuyao 已提交
3510 3511
    return pInfo->pDelRes;
  }
5
54liuyao 已提交
3512 3513 3514 3515 3516 3517 3518

  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
  if (pBInfo->pRes->info.rows > 0) {
    printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
    return pBInfo->pRes;
  }

H
Haojun Liao 已提交
3519
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
3520
  return NULL;
5
54liuyao 已提交
3521 3522
}

5
54liuyao 已提交
3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                  SExecTaskInfo* pTaskInfo) {
  SSessionWinodwPhysiNode*       pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
  int32_t                        numOfCols = 0;
  int32_t                        code = TSDB_CODE_OUT_OF_MEMORY;
  SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

  pOperator->pTaskInfo = pTaskInfo;

  initResultSizeInfo(&pOperator->resultInfo, 4096);
  if (pSessionNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
5
54liuyao 已提交
3543 3544
    }
  }
5
54liuyao 已提交
3545 3546 3547
  SExprSupp* pSup = &pOperator->exprSupp;

  SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
3548
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
5
54liuyao 已提交
3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583
  code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
                                pTaskInfo->streamInfo.pState, 0, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pSessionNode->window.watermark,
      .calTrigger = pSessionNode->window.triggerType,
      .maxTs = INT64_MIN,
      .minTs = INT64_MAX,
  };

  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

  pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
  if (pSessionNode->window.pTsEnd) {
    pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
  }
  pInfo->binfo.pRes = pResBlock;
  pInfo->order = TSDB_ORDER_ASC;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
  pInfo->pDelIterator = NULL;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
  pInfo->pChildren = NULL;
  pInfo->isFinal = false;
  pInfo->pPhyNode = pPhyNode;
  pInfo->ignoreExpiredData = pSessionNode->window.igExpired;

H
Haojun Liao 已提交
3584 3585 3586
  setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
                  OP_NOT_OPENED, pInfo, pTaskInfo);
  pOperator->fpSet =
3587
      createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
3588

5
54liuyao 已提交
3589
  if (downstream) {
5
54liuyao 已提交
3590
    initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
5
54liuyao 已提交
3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607
    code = appendDownstream(pOperator, &downstream, 1);
  }
  return pOperator;

_error:
  if (pInfo != NULL) {
    destroyStreamSessionAggOperatorInfo(pInfo);
  }

  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
  tSimpleHashClear(pInfo->streamAggSup.pResultRows);
  streamStateSessionClear(pInfo->streamAggSup.pState);
5
54liuyao 已提交
3608 3609 3610 3611 3612 3613 3614
}

static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SOptrBasicInfo*                pBInfo = &pInfo->binfo;
  TSKEY                          maxTs = INT64_MIN;
  SExprSupp*                     pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3615
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
3616

5
54liuyao 已提交
3617 3618
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
3619
  }
L
Liu Jicong 已提交
3620

3621
  {
5
54liuyao 已提交
3622
    doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
5
54liuyao 已提交
3623
    if (pBInfo->pRes->info.rows > 0) {
H
Haojun Liao 已提交
3624
      printDataBlock(pBInfo->pRes, "semi session");
5
54liuyao 已提交
3625 3626 3627
      return pBInfo->pRes;
    }

3628
    doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
3629
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3630
      printDataBlock(pInfo->pDelRes, "semi session delete");
5
54liuyao 已提交
3631 3632
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3633

3634
    if (pOperator->status == OP_RES_TO_RETURN) {
5
54liuyao 已提交
3635
      clearFunctionContext(&pOperator->exprSupp);
3636 3637
      // semi interval operator clear disk buffer
      clearStreamSessionOperator(pInfo);
H
Haojun Liao 已提交
3638
      setOperatorCompleted(pOperator);
3639 3640
      return NULL;
    }
5
54liuyao 已提交
3641 3642 3643
  }

  _hash_fn_t     hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3644
  SSHashObj*     pStUpdated = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
3645
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3646
  SArray*        pUpdated = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3647 3648 3649
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
3650
      clearSpecialDataBlock(pInfo->pUpdateRes);
3651
      pOperator->status = OP_RES_TO_RETURN;
5
54liuyao 已提交
3652 3653
      break;
    }
H
Haojun Liao 已提交
3654
    printDataBlock(pBlock, "semi session recv");
5
54liuyao 已提交
3655

5
54liuyao 已提交
3656 3657
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
5
54liuyao 已提交
3658
      // gap must be 0
3659
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3660
      doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
3661
      removeSessionResults(pStUpdated, pWins);
5
54liuyao 已提交
3662
      copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
3663
      taosArrayDestroy(pWins);
5
54liuyao 已提交
3664
      break;
5
54liuyao 已提交
3665
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3666
      getAllSessionWindow(pInfo->streamAggSup.pResultRows, pStUpdated);
5
54liuyao 已提交
3667 3668 3669
      continue;
    }

5
54liuyao 已提交
3670 3671 3672 3673
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
5
54liuyao 已提交
3674
    // the pDataBlock are always the same one, no need to call this again
3675
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
3676
    doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, NULL, false);
5
54liuyao 已提交
3677 3678 3679 3680
    maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
  }

  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
3681
  pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
3682

5
54liuyao 已提交
3683
  copyUpdateResult(pStUpdated, pUpdated);
5
54liuyao 已提交
3684 3685
  removeSessionResults(pInfo->pStDeleted, pUpdated);
  tSimpleHashCleanup(pStUpdated);
5
54liuyao 已提交
3686

5
54liuyao 已提交
3687
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
5
54liuyao 已提交
3688
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
3689

3690 3691 3692 3693 3694 3695
#if 0
  char* pBuf = streamStateSessionDump(pAggSup->pState);
  qDebug("===stream===semi session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

5
54liuyao 已提交
3696
  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
5
54liuyao 已提交
3697
  if (pBInfo->pRes->info.rows > 0) {
H
Haojun Liao 已提交
3698
    printDataBlock(pBInfo->pRes, "semi session");
5
54liuyao 已提交
3699 3700 3701
    return pBInfo->pRes;
  }

3702
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
3703
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3704
    printDataBlock(pInfo->pDelRes, "semi session delete");
5
54liuyao 已提交
3705 3706
    return pInfo->pDelRes;
  }
5
54liuyao 已提交
3707

5
54liuyao 已提交
3708 3709 3710
  clearFunctionContext(&pOperator->exprSupp);
  // semi interval operator clear disk buffer
  clearStreamSessionOperator(pInfo);
H
Haojun Liao 已提交
3711
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
3712
  return NULL;
5
54liuyao 已提交
3713
}
3714

3715 3716
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                       SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
3717 3718
  int32_t        code = TSDB_CODE_OUT_OF_MEMORY;
  SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo);
3719 3720 3721
  if (pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3722

3723
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
5
54liuyao 已提交
3724

H
Haojun Liao 已提交
3725
  pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
L
Liu Jicong 已提交
3726
  char* name = (pInfo->isFinal) ? "StreamSessionFinalAggOperator" : "StreamSessionSemiAggOperator";
H
Haojun Liao 已提交
3727 3728

  if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
H
Haojun Liao 已提交
3729
    pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
5
54liuyao 已提交
3730
    blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
3731 3732
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
                                           destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
5
54liuyao 已提交
3733
  }
3734

L
Liu Jicong 已提交
3735
  setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
H
Haojun Liao 已提交
3736

5
54liuyao 已提交
3737 3738 3739 3740
  pOperator->operatorType = pPhyNode->type;
  if (numOfChild > 0) {
    pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
    for (int32_t i = 0; i < numOfChild; i++) {
5
54liuyao 已提交
3741 3742
      SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
      if (pChildOp == NULL) {
5
54liuyao 已提交
3743 3744
        goto _error;
      }
5
54liuyao 已提交
3745 3746 3747 3748
      SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
      pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      streamStateSetNumber(pChInfo->streamAggSup.pState, i);
      taosArrayPush(pInfo->pChildren, &pChildOp);
3749 3750
    }
  }
3751 3752 3753 3754 3755

  if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
    pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
  }

3756 3757 3758 3759
  return pOperator;

_error:
  if (pInfo != NULL) {
3760
    destroyStreamSessionAggOperatorInfo(pInfo);
3761 3762 3763 3764 3765
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
5
54liuyao 已提交
3766

3767
void destroyStreamStateOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3768
  SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param;
3769
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
3770
  destroyStreamAggSupporter(&pInfo->streamAggSup);
5
54liuyao 已提交
3771 3772 3773 3774
  cleanupGroupResInfo(&pInfo->groupResInfo);
  if (pInfo->pChildren != NULL) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3775 3776
      SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
      destroyOperatorInfo(pChild);
5
54liuyao 已提交
3777
    }
5
54liuyao 已提交
3778
    taosArrayDestroy(pInfo->pChildren);
5
54liuyao 已提交
3779
  }
5
54liuyao 已提交
3780 3781
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  blockDataDestroy(pInfo->pDelRes);
5
54liuyao 已提交
3782
  tSimpleHashCleanup(pInfo->pSeDeleted);
D
dapan1121 已提交
3783
  taosMemoryFreeClear(param);
5
54liuyao 已提交
3784 3785 3786
}

bool isTsInWindow(SStateWindowInfo* pWin, TSKEY ts) {
5
54liuyao 已提交
3787
  if (pWin->winInfo.sessionWin.win.skey <= ts && ts <= pWin->winInfo.sessionWin.win.ekey) {
5
54liuyao 已提交
3788 3789 3790 3791 3792 3793
    return true;
  }
  return false;
}

bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
5
54liuyao 已提交
3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825
  return pKeyData && compareVal(pKeyData, pWin->pStateKey);
}

bool compareStateKey(void* data, void* key) {
  SStateKeys* stateKey = (SStateKeys*)key;
  stateKey->pData = (char*)key + sizeof(SStateKeys);
  return compareVal(data, stateKey);
}

void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
                       SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) {
  int32_t size = pAggSup->resultRowSize;
  pCurWin->winInfo.sessionWin.groupId = groupId;
  pCurWin->winInfo.sessionWin.win.skey = ts;
  pCurWin->winInfo.sessionWin.win.ekey = ts;
  int32_t code =
      streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize,
                                    compareStateKey, &pCurWin->winInfo.pOutputBuf, &size);
  pCurWin->pStateKey =
      (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
  pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
  pCurWin->pStateKey->type = pAggSup->stateKeyType;
  pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
  pCurWin->pStateKey->isNull = false;

  if (code == TSDB_CODE_SUCCESS) {
    pCurWin->winInfo.isOutput = true;
  } else {
    if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
      varDataCopy(pCurWin->pStateKey->pData, pKeyData);
    } else {
      memcpy(pCurWin->pStateKey->pData, pKeyData, pCurWin->pStateKey->bytes);
5
54liuyao 已提交
3826 3827 3828
    }
  }

5
54liuyao 已提交
3829 3830 3831 3832 3833 3834
  pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
  pNextWin->winInfo.pOutputBuf = NULL;
  SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
  code = streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_INVALID(pNextWin->winInfo);
5
54liuyao 已提交
3835
  }
5
54liuyao 已提交
3836
  streamStateFreeCur(pCur);
5
54liuyao 已提交
3837 3838
}

5
54liuyao 已提交
3839
int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId,
H
Haojun Liao 已提交
3840
                              SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual,
5
54liuyao 已提交
3841
                              SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) {
5
54liuyao 已提交
3842 3843 3844 3845
  *allEqual = true;
  for (int32_t i = start; i < rows; ++i) {
    char* pKeyData = colDataGetData(pKeyCol, i);
    if (!isTsInWindow(pWinInfo, pTs[i])) {
X
Xiaoyu Wang 已提交
3846
      if (isEqualStateKey(pWinInfo, pKeyData)) {
5
54liuyao 已提交
3847
        if (IS_VALID_SESSION_WIN(pNextWin->winInfo)) {
5
54liuyao 已提交
3848
          // ts belongs to the next window
5
54liuyao 已提交
3849
          if (pTs[i] >= pNextWin->winInfo.sessionWin.win.skey) {
5
54liuyao 已提交
3850 3851 3852 3853 3854 3855 3856
            return i - start;
          }
        }
      } else {
        return i - start;
      }
    }
5
54liuyao 已提交
3857 3858

    if (pWinInfo->winInfo.sessionWin.win.skey > pTs[i]) {
H
Haojun Liao 已提交
3859
      if (pSeDeleted && pWinInfo->winInfo.isOutput) {
5
54liuyao 已提交
3860
        saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
5
54liuyao 已提交
3861
      }
5
54liuyao 已提交
3862 3863
      removeSessionResult(pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin);
      pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
5
54liuyao 已提交
3864
    }
5
54liuyao 已提交
3865
    pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]);
5
54liuyao 已提交
3866 3867 3868 3869 3870 3871 3872
    if (!isEqualStateKey(pWinInfo, pKeyData)) {
      *allEqual = false;
    }
  }
  return rows - start;
}

5
54liuyao 已提交
3873 3874
static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
                                 SSHashObj* pStDeleted) {
X
Xiaoyu Wang 已提交
3875
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3876
  SStreamStateAggOperatorInfo* pInfo = pOperator->info;
3877
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
3878
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
X
Xiaoyu Wang 已提交
3879 3880 3881 3882
  int64_t                      code = TSDB_CODE_SUCCESS;
  TSKEY*                       tsCols = NULL;
  SResultRow*                  pResult = NULL;
  int32_t                      winRows = 0;
5
54liuyao 已提交
3883
  if (pSDataBlock->pDataBlock != NULL) {
X
Xiaoyu Wang 已提交
3884 3885
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
    tsCols = (int64_t*)pColDataInfo->pData;
5
54liuyao 已提交
3886
  } else {
X
Xiaoyu Wang 已提交
3887
    return;
5
54liuyao 已提交
3888
  }
L
Liu Jicong 已提交
3889

5
54liuyao 已提交
3890
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3891 3892
  int32_t              rows = pSDataBlock->info.rows;
  blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
L
Liu Jicong 已提交
3893
  SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
5
54liuyao 已提交
3894
  for (int32_t i = 0; i < rows; i += winRows) {
5
54liuyao 已提交
3895
    if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
5
54liuyao 已提交
3896 3897 3898
      i++;
      continue;
    }
5
54liuyao 已提交
3899 3900 3901 3902 3903 3904 3905 3906 3907
    char*            pKeyData = colDataGetData(pKeyColInfo, i);
    int32_t          winIndex = 0;
    bool             allEqual = true;
    SStateWindowInfo curWin = {0};
    SStateWindowInfo nextWin = {0};
    setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin);
    setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
    winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
                                    pAggSup->pResultRows, pSeUpdated, pStDeleted);
5
54liuyao 已提交
3908
    if (!allEqual) {
3909
      uint64_t uid = 0;
5
54liuyao 已提交
3910 3911 3912 3913 3914
      appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
                                       &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
      tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
      doDeleteSessionWindow(pAggSup, &curWin.winInfo.sessionWin);
      releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curWin.winInfo.pOutputBuf);
5
54liuyao 已提交
3915 3916
      continue;
    }
5
54liuyao 已提交
3917 3918
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
                              pOperator);
5
54liuyao 已提交
3919
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
3920
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3921
    }
5
54liuyao 已提交
3922 3923
    saveSessionOutputBuf(pAggSup, &curWin.winInfo);

5
54liuyao 已提交
3924
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
5
54liuyao 已提交
3925
      code = saveResult(curWin.winInfo, pSeUpdated);
5
54liuyao 已提交
3926
      if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
3927
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3928 3929
      }
    }
3930 3931

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
3932 3933
      SSessionKey key = {0};
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
3934 3935
      tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
    }
5
54liuyao 已提交
3936 3937 3938 3939 3940 3941 3942 3943
  }
}

static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

3944
  SExprSupp*                   pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3945
  SStreamStateAggOperatorInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
3946
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
L
Liu Jicong 已提交
3947
  int64_t                      maxTs = INT64_MIN;
5
54liuyao 已提交
3948
  if (pOperator->status == OP_RES_TO_RETURN) {
3949
    doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3950
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3951
      printDataBlock(pInfo->pDelRes, "single state delete");
5
54liuyao 已提交
3952 3953
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3954 3955 3956 3957 3958

    doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
    if (pBInfo->pRes->info.rows > 0) {
      printDataBlock(pBInfo->pRes, "single state");
      return pBInfo->pRes;
5
54liuyao 已提交
3959
    }
5
54liuyao 已提交
3960

H
Haojun Liao 已提交
3961
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
3962
    return NULL;
5
54liuyao 已提交
3963 3964
  }

X
Xiaoyu Wang 已提交
3965
  _hash_fn_t     hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3966
  SSHashObj*     pSeUpdated = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
3967
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3968
  SArray*        pUpdated = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3969 3970 3971 3972 3973
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      break;
    }
5
54liuyao 已提交
3974
    printDataBlock(pBlock, "single state recv");
3975

5
54liuyao 已提交
3976 3977 3978 3979
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
      doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
3980
      removeSessionResults(pSeUpdated, pWins);
5
54liuyao 已提交
3981
      copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
3982 3983
      taosArrayDestroy(pWins);
      continue;
3984
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3985
      getAllSessionWindow(pInfo->streamAggSup.pResultRows, pSeUpdated);
5
54liuyao 已提交
3986
      continue;
5
54liuyao 已提交
3987
    }
3988

5
54liuyao 已提交
3989 3990 3991 3992
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
3993
    // the pDataBlock are always the same one, no need to call this again
3994
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3995
    doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
3996
    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
5
54liuyao 已提交
3997
  }
3998
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
3999 4000
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;
X
Xiaoyu Wang 已提交
4001

5
54liuyao 已提交
4002
  closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pSeUpdated);
5
54liuyao 已提交
4003
  copyUpdateResult(pSeUpdated, pUpdated);
5
54liuyao 已提交
4004 4005
  removeSessionResults(pInfo->pSeDeleted, pUpdated);
  tSimpleHashCleanup(pSeUpdated);
5
54liuyao 已提交
4006

5
54liuyao 已提交
4007
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
5
54liuyao 已提交
4008
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
4009

5
54liuyao 已提交
4010 4011 4012 4013 4014 4015
#if 0
  char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState);
  qDebug("===stream===final session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

4016
  doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
4017
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4018
    printDataBlock(pInfo->pDelRes, "single state delete");
5
54liuyao 已提交
4019 4020 4021
    return pInfo->pDelRes;
  }

5
54liuyao 已提交
4022 4023 4024 4025 4026
  doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
  if (pBInfo->pRes->info.rows > 0) {
    printDataBlock(pBInfo->pRes, "single state");
    return pBInfo->pRes;
  }
H
Haojun Liao 已提交
4027
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
4028
  return NULL;
4029 4030
}

X
Xiaoyu Wang 已提交
4031 4032 4033 4034 4035
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo) {
  SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
  int32_t                      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
  SColumnNode*                 pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
H
Haojun Liao 已提交
4036
  int32_t                      code = TSDB_CODE_SUCCESS;
5
54liuyao 已提交
4037

X
Xiaoyu Wang 已提交
4038 4039
  SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo));
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5
54liuyao 已提交
4040
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
4041
    code = TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
4042 4043 4044 4045
    goto _error;
  }

  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
4046
  initResultSizeInfo(&pOperator->resultInfo, 4096);
5
54liuyao 已提交
4047 4048 4049
  if (pStateNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
H
Haojun Liao 已提交
4050
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
5
54liuyao 已提交
4051 4052 4053 4054 4055
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

X
Xiaoyu Wang 已提交
4056 4057
  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pStateNode->window.watermark,
5
54liuyao 已提交
4058 4059
      .calTrigger = pStateNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
4060
      .minTs = INT64_MAX,
X
Xiaoyu Wang 已提交
4061
  };
4062

5
54liuyao 已提交
4063
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
4064

5
54liuyao 已提交
4065 4066 4067
  SExprSupp*   pSup = &pOperator->exprSupp;
  int32_t      numOfCols = 0;
  SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
4068
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
4069
  code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
5
54liuyao 已提交
4070 4071 4072
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
5
54liuyao 已提交
4073 4074 4075 4076
  int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
  int16_t type = pColNode->node.resType.type;
  code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
                                type);
5
54liuyao 已提交
4077 4078 4079 4080 4081 4082
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  pInfo->primaryTsIndex = tsSlotId;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
4083
  pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
4084
  pInfo->pDelIterator = NULL;
H
Haojun Liao 已提交
4085
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
5
54liuyao 已提交
4086
  pInfo->pChildren = NULL;
5
54liuyao 已提交
4087
  pInfo->ignoreExpiredData = pStateNode->window.igExpired;
5
54liuyao 已提交
4088

L
Liu Jicong 已提交
4089 4090
  setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
4091
  pOperator->fpSet =
4092
      createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL);
5
54liuyao 已提交
4093
  initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
5
54liuyao 已提交
4094 4095 4096 4097 4098 4099 4100
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
  return pOperator;

_error:
4101
  destroyStreamStateOperatorInfo(pInfo);
5
54liuyao 已提交
4102 4103 4104 4105
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
4106

4107
void destroyMAIOperatorInfo(void* param) {
4108
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
4109
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
D
dapan1121 已提交
4110
  taosMemoryFreeClear(param);
4111 4112
}

4113
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
H
Haojun Liao 已提交
4114 4115
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
4116 4117
  return pResult;
}
4118

4119 4120 4121 4122 4123 4124 4125 4126
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
  if (*pResult == NULL) {
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
    if (*pResult == NULL) {
      return terrno;
    }
  }
4127

4128
  // set time window for current result
4129 4130
  (*pResult)->win = (*win);
  setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
4131
  return TSDB_CODE_SUCCESS;
4132 4133
}

4134
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
4135
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
4136
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
D
dapan1121 已提交
4137
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
4138 4139

  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
4140
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
4141
  SInterval*     pInterval = &iaInfo->interval;
4142

5
54liuyao 已提交
4143 4144
  int32_t  startPos = 0;
  int64_t* tsCols = extractTsCol(pBlock, iaInfo);
4145

4146 4147
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);

4148 4149
  // there is an result exists
  if (miaInfo->curTs != INT64_MIN) {
4150
    if (ts != miaInfo->curTs) {
4151
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
4152
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
4153
      miaInfo->curTs = ts;
4154
    }
4155 4156
  } else {
    miaInfo->curTs = ts;
4157 4158 4159
  }

  STimeWindow win = {0};
4160
  win.skey = miaInfo->curTs;
4161
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
4162

5
54liuyao 已提交
4163
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4164 4165
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
    T_LONG_JMP(pTaskInfo->env, ret);
4166 4167
  }

4168 4169
  int32_t currPos = startPos;

4170
  STimeWindow currWin = win;
4171
  while (++currPos < pBlock->info.rows) {
4172
    if (tsCols[currPos] == miaInfo->curTs) {
4173
      continue;
4174 4175 4176
    }

    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
H
Haojun Liao 已提交
4177
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
4178
                     pBlock->info.rows, pSup->numOfExprs);
4179

4180
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
4181
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
4182
    miaInfo->curTs = tsCols[currPos];
4183

4184
    currWin.skey = miaInfo->curTs;
4185
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
4186 4187

    startPos = currPos;
5
54liuyao 已提交
4188
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4189 4190
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
      T_LONG_JMP(pTaskInfo->env, ret);
4191
    }
4192 4193

    miaInfo->curTs = currWin.skey;
4194
  }
4195

4196
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
H
Haojun Liao 已提交
4197
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
4198
                   pBlock->info.rows, pSup->numOfExprs);
4199 4200
}

4201
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
H
Haojun Liao 已提交
4202
  pRes->info.id.groupId = pMiaInfo->groupId;
4203 4204
  pMiaInfo->curTs = INT64_MIN;
  pMiaInfo->groupId = 0;
4205 4206
}

4207
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
S
shenglian zhou 已提交
4208
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4209

4210 4211
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
4212

4213 4214 4215 4216 4217
  SExprSupp*      pSup = &pOperator->exprSupp;
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
  int32_t         scanFlag = MAIN_SCAN;
4218

4219 4220
  while (1) {
    SSDataBlock* pBlock = NULL;
4221
    if (pMiaInfo->prefetchedBlock == NULL) {
4222 4223
      pBlock = downstream->fpSet.getNextFn(downstream);
    } else {
4224 4225
      pBlock = pMiaInfo->prefetchedBlock;
      pMiaInfo->prefetchedBlock = NULL;
4226

H
Haojun Liao 已提交
4227
      pMiaInfo->groupId = pBlock->info.id.groupId;
4228
    }
4229

4230
    // no data exists, all query processing is done
4231
    if (pBlock == NULL) {
4232 4233 4234
      // close last unclosed time window
      if (pMiaInfo->curTs != INT64_MIN) {
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
4235 4236
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
4237
      }
4238

H
Haojun Liao 已提交
4239
      setOperatorCompleted(pOperator);
4240
      break;
4241
    }
4242

H
Haojun Liao 已提交
4243
    if (pMiaInfo->groupId == 0) {
H
Haojun Liao 已提交
4244 4245 4246
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
        pMiaInfo->groupId = pBlock->info.id.groupId;
        pRes->info.id.groupId = pMiaInfo->groupId;
H
Haojun Liao 已提交
4247 4248
      }
    } else {
H
Haojun Liao 已提交
4249
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
H
Haojun Liao 已提交
4250
        // if there are unclosed time window, close it firstly.
4251
        ASSERT(pMiaInfo->curTs != INT64_MIN);
H
Haojun Liao 已提交
4252
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
4253
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
H
Haojun Liao 已提交
4254

4255 4256
        pMiaInfo->prefetchedBlock = pBlock;
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
H
Haojun Liao 已提交
4257
        break;
5
54liuyao 已提交
4258
      } else {
H
Haojun Liao 已提交
4259
        // continue
H
Haojun Liao 已提交
4260
        pRes->info.id.groupId = pMiaInfo->groupId;
H
Haojun Liao 已提交
4261
      }
4262
    }
4263

4264
    getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag);
4265
    setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
4266
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
4267

H
Haojun Liao 已提交
4268
    doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
4269 4270 4271
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
      break;
    }
4272
  }
4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287
}

static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pRes = iaInfo->binfo.pRes;
  blockDataCleanup(pRes);

  if (iaInfo->binfo.mergeResultBlock) {
dengyihao's avatar
dengyihao 已提交
4288
    while (1) {
4289
      if (pOperator->status == OP_EXEC_DONE) {
4290 4291
        break;
      }
4292

4293
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
4294 4295 4296
        break;
      }

4297 4298 4299 4300
      doMergeAlignedIntervalAgg(pOperator);
    }
  } else {
    doMergeAlignedIntervalAgg(pOperator);
4301 4302 4303 4304 4305 4306 4307
  }

  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;
  return (rows == 0) ? NULL : pRes;
}

4308
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
4309
                                                      SExecTaskInfo* pTaskInfo) {
4310
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
4311
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4312 4313 4314 4315
  if (miaInfo == NULL || pOperator == NULL) {
    goto _error;
  }

D
dapan1121 已提交
4316 4317 4318 4319 4320
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
  if (miaInfo->intervalAggOperatorInfo == NULL) {
    goto _error;
  }

4321 4322 4323 4324 4325 4326 4327
  SInterval interval = {.interval = pNode->interval,
                        .sliding = pNode->sliding,
                        .intervalUnit = pNode->intervalUnit,
                        .slidingUnit = pNode->slidingUnit,
                        .offset = pNode->offset,
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision};

D
dapan1121 已提交
4328
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
4329
  SExprSupp*                pSup = &pOperator->exprSupp;
4330

H
Haojun Liao 已提交
4331 4332 4333 4334 4335
  int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

L
Liu Jicong 已提交
4336 4337 4338 4339
  miaInfo->curTs = INT64_MIN;
  iaInfo->win = pTaskInfo->window;
  iaInfo->inputOrder = TSDB_ORDER_ASC;
  iaInfo->interval = interval;
4340 4341
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
4342 4343

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
H
Haojun Liao 已提交
4344
  initResultSizeInfo(&pOperator->resultInfo, 512);
4345

H
Haojun Liao 已提交
4346 4347
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
H
Haojun Liao 已提交
4348

H
Haojun Liao 已提交
4349
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
4350 4351 4352
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4353

H
Haojun Liao 已提交
4354
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
4355
  initBasicInfo(&iaInfo->binfo, pResBlock);
4356
  initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
4357

4358
  iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo);
4359
  if (iaInfo->timeWindowInterpo) {
4360
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
4361 4362
  }

4363
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
4364
  blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
L
Liu Jicong 已提交
4365 4366
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
4367

4368
  pOperator->fpSet =
4369
      createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, optrDefaultBufFn, NULL);
4370 4371 4372 4373 4374 4375 4376 4377 4378

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
4379
  destroyMAIOperatorInfo(miaInfo);
4380 4381 4382 4383
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
4384 4385 4386 4387 4388

//=====================================================================================================================
// merge interval operator
typedef struct SMergeIntervalAggOperatorInfo {
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
L
Liu Jicong 已提交
4389 4390 4391 4392 4393 4394
  SList*                   groupIntervals;
  SListIter                groupIntervalsIter;
  bool                     hasGroupId;
  uint64_t                 groupId;
  SSDataBlock*             prefetchedBlock;
  bool                     inputBlocksFinished;
4395 4396
} SMergeIntervalAggOperatorInfo;

S
slzhou 已提交
4397
typedef struct SGroupTimeWindow {
L
Liu Jicong 已提交
4398
  uint64_t    groupId;
S
slzhou 已提交
4399 4400 4401
  STimeWindow window;
} SGroupTimeWindow;

4402
void destroyMergeIntervalOperatorInfo(void* param) {
4403
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
S
slzhou 已提交
4404
  tdListFree(miaInfo->groupIntervals);
4405
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
4406

D
dapan1121 已提交
4407
  taosMemoryFreeClear(param);
4408 4409
}

L
Liu Jicong 已提交
4410 4411
static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win,
                                    SSDataBlock* pResultBlock) {
4412 4413 4414
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
  SExecTaskInfo*                 pTaskInfo = pOperatorInfo->pTaskInfo;
4415
  bool                           ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4416 4417 4418
  SExprSupp*                     pExprSup = &pOperatorInfo->exprSupp;

  SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
L
Liu Jicong 已提交
4419 4420
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
      iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
4421
  ASSERT(p1 != NULL);
5
54liuyao 已提交
4422
  //  finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
4423
  tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
4424 4425 4426
  return TSDB_CODE_SUCCESS;
}

4427 4428 4429 4430
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
                                        STimeWindow* newWin) {
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
4431
  bool                           ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4432

S
slzhou 已提交
4433 4434
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
  tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
4435

S
slzhou 已提交
4436 4437 4438 4439 4440
  SListIter iter = {0};
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
  SListNode* listNode = NULL;
  while ((listNode = tdListNext(&iter)) != NULL) {
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
L
Liu Jicong 已提交
4441
    if (prevGrpWin->groupId != tableGroupId) {
S
slzhou 已提交
4442 4443
      continue;
    }
4444

S
slzhou 已提交
4445
    STimeWindow* prevWin = &prevGrpWin->window;
H
Haojun Liao 已提交
4446
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
5
54liuyao 已提交
4447
      //      finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
S
slzhou 已提交
4448 4449
      tdListPopNode(miaInfo->groupIntervals, listNode);
    }
4450 4451 4452 4453 4454 4455 4456 4457 4458 4459 4460 4461 4462 4463 4464 4465
  }

  return 0;
}

static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;

  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;

  int32_t     startPos = 0;
  int32_t     numOfOutput = pExprSup->numOfExprs;
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo);
H
Haojun Liao 已提交
4466
  uint64_t    tableGroupId = pBlock->info.id.groupId;
4467
  bool        ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4468 4469 4470
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
  SResultRow* pResult = NULL;

4471 4472
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
                                        iaInfo->inputOrder);
4473 4474 4475 4476 4477

  int32_t ret =
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
4478
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4479 4480 4481 4482
  }

  TSKEY   ekey = ascScan ? win.ekey : win.skey;
  int32_t forwardRows =
4483
      getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
4484
  ASSERT(forwardRows > 0);
4485 4486 4487

  // prev time window not interpolation yet.
  if (iaInfo->timeWindowInterpo) {
4488
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
4489 4490 4491 4492 4493 4494
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);

    // restore current time window
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
4495
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4496 4497 4498 4499 4500 4501 4502
    }

    // window start key interpolation
    doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
  }

  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
H
Haojun Liao 已提交
4503
  applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
4504
                   pBlock->info.rows, numOfOutput);
4505 4506 4507 4508 4509 4510 4511 4512
  doCloseWindow(pResultRowInfo, iaInfo, pResult);

  // output previous interval results after this interval (&win) is closed
  outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);

  STimeWindow nextWin = win;
  while (1) {
    int32_t prevEndPos = forwardRows - 1 + startPos;
4513 4514
    startPos =
        getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
4515 4516 4517 4518 4519 4520 4521 4522 4523
    if (startPos < 0) {
      break;
    }

    // null data, failed to allocate more memory buffer
    int32_t code =
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
4524
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4525 4526 4527 4528
    }

    ekey = ascScan ? nextWin.ekey : nextWin.skey;
    forwardRows =
4529
        getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
4530 4531 4532 4533 4534

    // window start(end) key interpolation
    doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);

    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
H
Haojun Liao 已提交
4535
    applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
4536
                     pBlock->info.rows, numOfOutput);
4537 4538 4539 4540 4541 4542 4543 4544 4545 4546 4547 4548 4549 4550 4551 4552 4553 4554 4555 4556 4557 4558 4559 4560 4561 4562 4563 4564 4565 4566 4567 4568 4569 4570 4571
    doCloseWindow(pResultRowInfo, iaInfo, pResult);

    // output previous interval results after this interval (&nextWin) is closed
    outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
  }

  if (iaInfo->timeWindowInterpo) {
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
  }
}

static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pRes = iaInfo->binfo.pRes;
  blockDataCleanup(pRes);
  blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);

  if (!miaInfo->inputBlocksFinished) {
    SOperatorInfo* downstream = pOperator->pDownstream[0];
    int32_t        scanFlag = MAIN_SCAN;
    while (1) {
      SSDataBlock* pBlock = NULL;
      if (miaInfo->prefetchedBlock == NULL) {
        pBlock = downstream->fpSet.getNextFn(downstream);
      } else {
        pBlock = miaInfo->prefetchedBlock;
H
Haojun Liao 已提交
4572
        miaInfo->groupId = pBlock->info.id.groupId;
4573 4574 4575 4576
        miaInfo->prefetchedBlock = NULL;
      }

      if (pBlock == NULL) {
S
slzhou 已提交
4577
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
4578 4579 4580 4581 4582 4583
        miaInfo->inputBlocksFinished = true;
        break;
      }

      if (!miaInfo->hasGroupId) {
        miaInfo->hasGroupId = true;
H
Haojun Liao 已提交
4584 4585
        miaInfo->groupId = pBlock->info.id.groupId;
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
4586 4587 4588 4589
        miaInfo->prefetchedBlock = pBlock;
        break;
      }

4590
      getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag);
4591
      setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true);
4592 4593 4594 4595 4596 4597 4598
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);

      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
        break;
      }
    }

H
Haojun Liao 已提交
4599
    pRes->info.id.groupId = miaInfo->groupId;
4600 4601 4602
  }

  if (miaInfo->inputBlocksFinished) {
S
slzhou 已提交
4603
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
4604

S
slzhou 已提交
4605 4606
    if (listNode != NULL) {
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
5
54liuyao 已提交
4607
      //      finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
H
Haojun Liao 已提交
4608
      pRes->info.id.groupId = grpWin->groupId;
4609 4610 4611 4612
    }
  }

  if (pRes->info.rows == 0) {
H
Haojun Liao 已提交
4613
    setOperatorCompleted(pOperator);
4614 4615 4616 4617 4618 4619 4620
  }

  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;
  return (rows == 0) ? NULL : pRes;
}

4621 4622 4623
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
                                               SExecTaskInfo* pTaskInfo) {
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
4624
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4625
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
4626 4627 4628
    goto _error;
  }

5
54liuyao 已提交
4629 4630
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4631 4632 4633 4634 4635 4636 4637

  SInterval interval = {.interval = pIntervalPhyNode->interval,
                        .sliding = pIntervalPhyNode->sliding,
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
                        .offset = pIntervalPhyNode->offset,
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4638

4639
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
4640

4641
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
L
Liu Jicong 已提交
4642
  pIntervalInfo->win = pTaskInfo->window;
4643
  pIntervalInfo->inputOrder = TSDB_ORDER_ASC;
L
Liu Jicong 已提交
4644
  pIntervalInfo->interval = interval;
4645 4646
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
4647 4648 4649 4650

  SExprSupp* pExprSupp = &pOperator->exprSupp;

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
4651
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4652

H
Haojun Liao 已提交
4653
  int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
4654 4655 4656
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4657

H
Haojun Liao 已提交
4658
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
4659 4660
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
  initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
4661

4662 4663
  pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
  if (pIntervalInfo->timeWindowInterpo) {
4664
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
4665
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
4666 4667 4668 4669
      goto _error;
    }
  }

4670
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
4671 4672
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
4673
  pOperator->fpSet =
4674
      createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, optrDefaultBufFn, NULL);
4675 4676 4677 4678 4679 4680 4681 4682 4683

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
H
Haojun Liao 已提交
4684 4685 4686 4687
  if (pMergeIntervalInfo != NULL) {
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
  }

4688 4689 4690
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
4691
}
4692 4693 4694

static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
4695 4696
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  int64_t                      maxTs = INT64_MIN;
5
54liuyao 已提交
4697
  int64_t                      minTs = INT64_MAX;
4698
  SExprSupp*                   pSup = &pOperator->exprSupp;
4699 4700 4701 4702 4703 4704

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  if (pOperator->status == OP_RES_TO_RETURN) {
4705
    doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
4706
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4707
      printDataBlock(pInfo->pDelRes, "single interval delete");
4708 4709 4710
      return pInfo->pDelRes;
    }

4711
    doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
4712 4713 4714
    if (pInfo->binfo.pRes->info.rows > 0) {
      printDataBlock(pInfo->binfo.pRes, "single interval");
      return pInfo->binfo.pRes;
4715
    }
4716 4717
    deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
                          &pInfo->delKey);
H
Haojun Liao 已提交
4718
    setOperatorCompleted(pOperator);
L
Liu Jicong 已提交
4719
    streamStateCommit(pTaskInfo->streamInfo.pState);
5
54liuyao 已提交
4720
    return NULL;
4721 4722 4723 4724 4725 4726 4727 4728 4729 4730 4731
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

  SArray*    pUpdated = taosArrayInit(4, POINTER_BYTES);  // SResKeyPos
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  SHashObj*  pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);

  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
4732 4733
      qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
      pInfo->numOfDatapack = 0;
4734 4735
      break;
    }
5
54liuyao 已提交
4736
    pInfo->numOfDatapack++;
4737 4738
    printDataBlock(pBlock, "single interval recv");

4739 4740 4741
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
4742 4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760
      continue;
    } else if (pBlock->info.type == STREAM_GET_ALL) {
      getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
      continue;
    }

    if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) {
      // set input version
      pTaskInfo->version = pBlock->info.version;
    }

    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }

    // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
    // caller. Note that all the time window are not close till now.
    // the pDataBlock are always the same one, no need to call this again
4761
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
4762 4763 4764 4765 4766
    if (pInfo->invertible) {
      setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
    }

    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
5
54liuyao 已提交
4767
    minTs = TMIN(minTs, pBlock->info.window.skey);
H
Haojun Liao 已提交
4768

H
Haojun Liao 已提交
4769
    doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap);
4770 4771
  }
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
4772
  pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
4773
  pOperator->status = OP_RES_TO_RETURN;
4774
  removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
5
54liuyao 已提交
4775
  closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
4776
                            pInfo->pDelWins, pOperator);
4777 4778 4779 4780 4781 4782 4783 4784 4785 4786

  void* pIte = NULL;
  while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
    taosArrayPush(pUpdated, pIte);
  }
  taosArraySort(pUpdated, resultrowComparAsc);

  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
  taosHashCleanup(pUpdatedMap);
5
54liuyao 已提交
4787

4788
  doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
4789
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4790
    printDataBlock(pInfo->pDelRes, "single interval delete");
4791 4792 4793
    return pInfo->pDelRes;
  }

4794
  doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
4795 4796 4797 4798 4799 4800
  if (pInfo->binfo.pRes->info.rows > 0) {
    printDataBlock(pInfo->binfo.pRes, "single interval");
    return pInfo->binfo.pRes;
  }

  return NULL;
4801 4802 4803 4804 4805
}

SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo) {
  SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
4806
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4807 4808 4809 4810 4811
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
  SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;

H
Haojun Liao 已提交
4812
  int32_t    code = TSDB_CODE_SUCCESS;
4813 4814
  int32_t    numOfCols = 0;
  SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
4815
  ASSERT(numOfCols > 0);
H
Haojun Liao 已提交
4816

H
Haojun Liao 已提交
4817
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
4818 4819 4820 4821 4822 4823 4824 4825
  SInterval    interval = {
         .interval = pIntervalPhyNode->interval,
         .sliding = pIntervalPhyNode->sliding,
         .intervalUnit = pIntervalPhyNode->intervalUnit,
         .slidingUnit = pIntervalPhyNode->slidingUnit,
         .offset = pIntervalPhyNode->offset,
         .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
  };
H
Haojun Liao 已提交
4826

4827 4828 4829 4830
  STimeWindowAggSupp twAggSupp = {
      .waterMark = pIntervalPhyNode->window.watermark,
      .calTrigger = pIntervalPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
4831
      .minTs = INT64_MAX,
5
54liuyao 已提交
4832
      .deleteMark = getDeleteMark(pIntervalPhyNode),
4833
  };
H
Haojun Liao 已提交
4834

4835
  ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
4836

4837 4838 4839 4840 4841 4842 4843
  pOperator->pTaskInfo = pTaskInfo;
  pInfo->interval = interval;
  pInfo->twAggSup = twAggSupp;
  pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
  pInfo->isFinal = false;

  SExprSupp* pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
4844 4845 4846 4847
  initBasicInfo(&pInfo->binfo, pResBlock);
  initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

4848
  pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
4849
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4850

4851
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
H
Haojun Liao 已提交
4852
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
4853 4854 4855 4856
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
4857 4858 4859 4860 4861 4862 4863 4864
  if (pIntervalPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }
4865 4866

  pInfo->invertible = allInvertible(pSup->pCtx, numOfCols);
4867
  pInfo->invertible = false;
4868 4869 4870 4871 4872
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
  pInfo->delIndex = 0;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
  initResultRowInfo(&pInfo->binfo.resultRowInfo);

4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
  streamStateSetNumber(pInfo->pState, -1);

  pInfo->pPhyNode = NULL;  // create new child
  pInfo->pPullDataMap = NULL;
  pInfo->pPullWins = NULL;  // SPullWindowInfo
  pInfo->pullIndex = 0;
  pInfo->pPullDataRes = NULL;
  pInfo->isFinal = false;
  pInfo->pChildren = NULL;
  pInfo->delKey.ts = INT64_MAX;
  pInfo->delKey.groupId = 0;
5
54liuyao 已提交
4886
  pInfo->numOfDatapack = 0;
4887

L
Liu Jicong 已提交
4888 4889
  setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
4890 4891
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
                                         destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
4892

5
54liuyao 已提交
4893
  initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
4894 4895 4896 4897 4898 4899 4900 4901
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
4902
  destroyStreamFinalIntervalOperatorInfo(pInfo);
4903 4904 4905 4906
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
H
Haojun Liao 已提交
4907