timewindowoperator.c 187.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
#include "executorimpl.h"
16
#include "filter.h"
X
Xiaoyu Wang 已提交
17
#include "function.h"
5
54liuyao 已提交
18
#include "functionMgt.h"
H
Haojun Liao 已提交
19
#include "tcommon.h"
5
54liuyao 已提交
20
#include "tcompare.h"
L
Liu Jicong 已提交
21
#include "tdatablock.h"
H
Haojun Liao 已提交
22
#include "tfill.h"
23
#include "ttime.h"
24

L
Liu Jicong 已提交
25
#define IS_FINAL_OP(op)    ((op)->isFinal)
5
54liuyao 已提交
26
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
H
Haojun Liao 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51

typedef struct SSessionAggOperatorInfo {
  SOptrBasicInfo     binfo;
  SAggSupporter      aggSup;
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
  bool               reptScan;  // next round scan
  int64_t            gap;       // session window gap
  int32_t            tsSlotId;  // primary timestamp slot id
  STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;

typedef struct SStateWindowOperatorInfo {
  SOptrBasicInfo     binfo;
  SAggSupporter      aggSup;
  SExprSupp          scalarSup;
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
  SColumn            stateCol;  // start row index
  bool               hasKey;
  SStateKeys         stateKey;
  int32_t            tsSlotId;  // primary timestamp column slot id
  STimeWindowAggSupp twAggSup;
} SStateWindowOperatorInfo;

52 53 54 55 56
typedef enum SResultTsInterpType {
  RESULT_ROW_START_INTERP = 1,
  RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;

5
54liuyao 已提交
57 58
typedef struct SPullWindowInfo {
  STimeWindow window;
L
Liu Jicong 已提交
59
  uint64_t    groupId;
5
54liuyao 已提交
60
  STimeWindow calWin;
5
54liuyao 已提交
61 62
} SPullWindowInfo;

63 64
typedef struct SOpenWindowInfo {
  SResultRowPosition pos;
L
Liu Jicong 已提交
65
  uint64_t           groupId;
66 67
} SOpenWindowInfo;

68 69
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);

L
Liu Jicong 已提交
70 71
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
                                              uint64_t groupId);
72 73
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);

X
Xiaoyu Wang 已提交
74
static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
75 76 77

static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
78
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
79 80 81 82 83 84 85 86 87 88 89
                                      SExecTaskInfo* pTaskInfo) {
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup);

  if (pResultRow == NULL) {
    *pResult = NULL;
    return TSDB_CODE_SUCCESS;
  }

  // set time window for current result
  pResultRow->win = (*win);
90

91
  *pResult = pResultRow;
92
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
93

94 95 96 97 98 99 100 101 102 103 104 105 106
  return TSDB_CODE_SUCCESS;
}

static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
  int64_t* ts = (int64_t*)pColData->pData;
  int32_t  delta = includeEndpoint ? 1 : 0;

  int64_t duration = pWin->ekey - pWin->skey + delta;
  ts[2] = duration;            // set the duration
  ts[3] = pWin->skey;          // window start key
  ts[4] = pWin->ekey + delta;  // window end key
}

107
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
108 109 110
  pRowSup->win.ekey = ts;
  pRowSup->prevTs = ts;
  pRowSup->numOfRows += 1;
111
  pRowSup->groupId = groupId;
112 113
}

dengyihao's avatar
dengyihao 已提交
114 115
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
                                     uint64_t groupId) {
116 117 118
  pRowSup->startRowIndex = rowIndex;
  pRowSup->numOfRows = 0;
  pRowSup->win.skey = tsList[rowIndex];
119
  pRowSup->groupId = groupId;
120 121
}

L
Liu Jicong 已提交
122 123
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
                                            int32_t order, int64_t* pData) {
124
  int32_t forwardRows = 0;
125 126 127 128

  if (order == TSDB_ORDER_ASC) {
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
    if (end >= 0) {
129
      forwardRows = end;
130 131

      if (pData[end + pos] == ekey) {
132
        forwardRows += 1;
133 134 135
      }
    }
  } else {
136
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
137
    if (end >= 0) {
138
      forwardRows = end;
139

140
      if (pData[end + pos] == ekey) {
141
        forwardRows += 1;
142 143
      }
    }
X
Xiaoyu Wang 已提交
144 145 146 147 148 149 150 151
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
    //    if (end >= 0) {
    //      forwardRows = pos - end;
    //
    //      if (pData[end] == ekey) {
    //        forwardRows += 1;
    //      }
    //    }
152 153
  }

154 155
  assert(forwardRows >= 0);
  return forwardRows;
156 157
}

5
54liuyao 已提交
158
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
  int32_t midPos = -1;
  int32_t numOfRows;

  if (num <= 0) {
    return -1;
  }

  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
175 176 177 178 179 180 181 182 183 184 185
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }
186 187 188 189 190 191

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
192 193
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

X
Xiaoyu Wang 已提交
229 230
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
  assert(startPos >= 0 && startPos < pDataBlockInfo->rows);

  int32_t num = -1;
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);

  if (order == TSDB_ORDER_ASC) {
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
      if (item != NULL) {
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
      }
    } else {
      num = pDataBlockInfo->rows - startPos;
      if (item != NULL) {
        item->lastKey = pDataBlockInfo->window.ekey + step;
      }
    }
  } else {  // desc
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
      if (item != NULL) {
252
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
253 254
      }
    } else {
255
      num = pDataBlockInfo->rows - startPos;
256
      if (item != NULL) {
257
        item->lastKey = pDataBlockInfo->window.ekey + step;
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
      }
    }
  }

  assert(num >= 0);
  return num;
}

static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t order, STimeWindow* tw) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
  // convert key to second
  key = convertTimePrecision(key, precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
  time_t    t = (time_t)key;
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
289
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
290 291 292 293

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
294
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
295 296 297 298

  tw->ekey -= 1;
}

5
54liuyao 已提交
299 300 301 302
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  getNextTimeWindow(pInterval, pInterval->precision, order, tw);
}

303 304
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
305
  SqlFunctionCtx* pCtx = pSup->pCtx;
306

307
  int32_t index = 1;
308
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
H
Haojun Liao 已提交
309
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
310 311 312 313
      pCtx[k].start.key = INT64_MIN;
      continue;
    }

X
Xiaoyu Wang 已提交
314
    SFunctParam*     pParam = &pCtx[k].param[0];
315 316
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);

317
    ASSERT(pColInfo->info.type == pParam->pCol->type && curTs != windowKey);
318

319
    double v1 = 0, v2 = 0, v = 0;
320
    if (prevRowIndex == -1) {
321
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
322
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData);
323
    } else {
324
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex));
325 326
    }

327
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
328

329
#if 0
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
    if (functionId == FUNCTION_INTERP) {
      if (type == RESULT_ROW_START_INTERP) {
        pCtx[k].start.key = prevTs;
        pCtx[k].start.val = v1;

        pCtx[k].end.key = curTs;
        pCtx[k].end.val = v2;

        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
          if (prevRowIndex == -1) {
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
          } else {
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
          }

          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
        }
      }
    } else if (functionId == FUNCTION_TWA) {
349 350
#endif

X
Xiaoyu Wang 已提交
351 352 353
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
    SPoint point = (SPoint){.key = windowKey, .val = &v};
354

X
Xiaoyu Wang 已提交
355
    taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
356

X
Xiaoyu Wang 已提交
357 358 359 360 361 362
    if (type == RESULT_ROW_START_INTERP) {
      pCtx[k].start.key = point.key;
      pCtx[k].start.val = v;
    } else {
      pCtx[k].end.key = point.key;
      pCtx[k].end.val = v;
363
    }
X
Xiaoyu Wang 已提交
364 365 366

    index += 1;
  }
367
#if 0
368
  }
369
#endif
370 371 372 373 374 375 376 377 378 379 380 381 382 383
}

static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
  if (type == RESULT_ROW_START_INTERP) {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      pCtx[k].start.key = INT64_MIN;
    }
  } else {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      pCtx[k].end.key = INT64_MIN;
    }
  }
}

384 385
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
386
  bool ascQuery = (pInfo->inputOrder == TSDB_ORDER_ASC);
387

388
  TSKEY curTs = tsCols[pos];
389 390

  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
X
Xiaoyu Wang 已提交
391
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
392 393 394 395 396

  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
  // start exactly from this point, no need to do interpolation
  TSKEY key = ascQuery ? win->skey : win->ekey;
  if (key == curTs) {
397
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
398 399 400
    return true;
  }

401 402
  // it is the first time window, no need to do interpolation
  if (pTsKey->isNull && pos == 0) {
403
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
404 405
  } else {
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
406 407
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
                              RESULT_ROW_START_INTERP, pSup);
408 409 410 411 412
  }

  return true;
}

413 414 415
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
                                            SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
                                            STimeWindow* win) {
416
  int32_t order = pInfo->inputOrder;
417 418

  TSKEY actualEndKey = tsCols[endRowIndex];
419
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
420 421

  // not ended in current data block, do not invoke interpolation
422
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
423
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
424 425 426
    return false;
  }

427
  // there is actual end point of current time window, no interpolation needs
428
  if (key == actualEndKey) {
429
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
430 431 432
    return true;
  }

433
  int32_t nextRowIndex = endRowIndex + 1;
434 435 436
  assert(nextRowIndex >= 0);

  TSKEY nextKey = tsCols[nextRowIndex];
437 438
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
                            RESULT_ROW_END_INTERP, pSup);
439 440 441
  return true;
}

442 443
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
  if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
5
54liuyao 已提交
444 445 446 447 448
    return false;
  }
  return true;
}

449 450 451 452
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
}

453
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
5
54liuyao 已提交
454
                                      TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
X
Xiaoyu Wang 已提交
455
  bool ascQuery = (order == TSDB_ORDER_ASC);
456 457 458 459 460 461 462 463 464 465

  int32_t precision = pInterval->precision;
  getNextTimeWindow(pInterval, precision, order, pNext);

  // next time window is not in current block
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
    return -1;
  }

5
54liuyao 已提交
466 467 468 469
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
    return -1;
  }

470
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
471 472 473 474
  int32_t startPos = 0;

  // tumbling time window query, a special case of sliding time window query
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
475
    startPos = prevPosition + 1;
476
  } else {
477
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
478 479
      startPos = 0;
    } else {
480
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
    }
  }

  /* interp query with fill should not skip time window */
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
  //    return startPos;
  //  }

  /*
   * This time window does not cover any data, try next time window,
   * this case may happen when the time window is too small
   */
  if (primaryKeys == NULL) {
    if (ascQuery) {
      assert(pDataBlockInfo->window.skey <= pNext->ekey);
    } else {
      assert(pDataBlockInfo->window.ekey >= pNext->skey);
    }
  } else {
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
      TSKEY next = primaryKeys[startPos];
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
        pNext->skey = taosTimeTruncate(next, pInterval, precision);
        pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
      } else {
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
        pNext->skey = pNext->ekey - pInterval->interval + 1;
      }
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
      TSKEY next = primaryKeys[startPos];
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
        pNext->skey = taosTimeTruncate(next, pInterval, precision);
        pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
      } else {
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
        pNext->ekey = pNext->skey + pInterval->interval - 1;
      }
    }
  }

  return startPos;
}

524
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
525
  ASSERT(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
  if (type == RESULT_ROW_START_INTERP) {
    return pResult->startInterp == true;
  } else {
    return pResult->endInterp == true;
  }
}

static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
  assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
  if (type == RESULT_ROW_START_INTERP) {
    pResult->startInterp = true;
  } else {
    pResult->endInterp = true;
  }
}

542 543
static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
                                        STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
544
  if (!pInfo->timeWindowInterpo) {
545 546 547
    return;
  }

548
  ASSERT(pBlock != NULL);
549 550 551 552 553
  if (pBlock->pDataBlock == NULL) {
    //    tscError("pBlock->pDataBlock == NULL");
    return;
  }

554
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
555 556

  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
557
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
558
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
559
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
560 561 562 563
    if (interp) {
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
    }
  } else {
564
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
565 566 567 568 569 570 571 572
  }

  // point interpolation does not require the end key time window interpolation.
  //  if (pointInterpQuery) {
  //    return;
  //  }

  // interpolation query does not generate the time window end interpolation
573
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
574
  if (!done) {
575
    int32_t endRowIndex = startPos + forwardRows - 1;
576

577
    TSKEY endKey = (pInfo->inputOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
578
    bool  interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
579 580 581 582
    if (interp) {
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
    }
  } else {
583
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
584 585 586
  }
}

587 588
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
  if (pBlock->pDataBlock == NULL) {
589 590 591
    return;
  }

592 593 594 595 596 597 598
  size_t num = taosArrayGetSize(pPrevKeys);
  for (int32_t k = 0; k < num; ++k) {
    SColumn* pc = taosArrayGet(pCols, k);

    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);

    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
X
Xiaoyu Wang 已提交
599
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
600 601 602 603 604 605 606
      if (colDataIsNull_s(pColInfo, i)) {
        continue;
      }

      char* val = colDataGetData(pColInfo, i);
      if (IS_VAR_DATA_TYPE(pkey->type)) {
        memcpy(pkey->pData, val, varDataTLen(val));
607
        ASSERT(varDataTLen(val) <= pkey->bytes);
608 609 610 611 612 613
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }

      break;
    }
614 615 616
  }
}

617 618 619 620
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;

621
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
622
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
623

L
Liu Jicong 已提交
624 625
  int32_t startPos = 0;
  int32_t numOfOutput = pSup->numOfExprs;
626

627
  SResultRow* pResult = NULL;
628

629
  while (1) {
L
Liu Jicong 已提交
630 631 632
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
    uint64_t            groupId = pOpenWin->groupId;
633
    SResultRowPosition* p1 = &pOpenWin->pos;
634 635 636
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
      break;
    }
637

638
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
639 640 641
    if (NULL == pr) {
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
L
Liu Jicong 已提交
642

643
    ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
644

645
    if (pr->closed) {
646
      ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
X
Xiaoyu Wang 已提交
647
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP));
648 649
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
      taosMemoryFree(pNode);
650 651
      continue;
    }
652

653
    STimeWindow w = pr->win;
654 655
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
656
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
657
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
658 659
    }

660
    ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
661

X
Xiaoyu Wang 已提交
662 663
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
H
Haojun Liao 已提交
664
    if (groupId == pBlock->info.id.groupId) {
665 666 667
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
                                RESULT_ROW_END_INTERP, pSup);
    }
668 669

    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
670
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
671

672
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
L
Liu Jicong 已提交
673 674
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
                                    pBlock->info.rows, numOfExprs);
675 676 677

    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
      closeResultRow(pr);
678 679
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
      taosMemoryFree(pNode);
X
Xiaoyu Wang 已提交
680
    } else {  // the remains are can not be closed yet.
681
      break;
682
    }
683
  }
684
}
685

5
54liuyao 已提交
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);

int32_t binarySearchCom(void* keyList, int num, void* pKey, int order, __compare_fn_t comparefn) {
  int firstPos = 0, lastPos = num - 1, midPos = -1;
  int numOfRows = 0;

  if (num <= 0) return -1;
  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller or equal than the key
    while (1) {
      if (comparefn(pKey, keyList, lastPos) >= 0) return lastPos;
      if (comparefn(pKey, keyList, firstPos) == 0) return firstPos;
      if (comparefn(pKey, keyList, firstPos) < 0) return firstPos - 1;

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (comparefn(pKey, keyList, midPos) < 0) {
        lastPos = midPos - 1;
      } else if (comparefn(pKey, keyList, midPos) > 0) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger or equal than the key
    while (1) {
      if (comparefn(pKey, keyList, firstPos) <= 0) return firstPos;
      if (comparefn(pKey, keyList, lastPos) == 0) return lastPos;

      if (comparefn(pKey, keyList, lastPos) > 0) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (comparefn(pKey, keyList, midPos) < 0) {
        lastPos = midPos - 1;
      } else if (comparefn(pKey, keyList, midPos) > 0) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

5
54liuyao 已提交
742
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
743

X
Xiaoyu Wang 已提交
744 745 746
int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_fn_t getValuefn) {
  int firstPos = 0, lastPos = num - 1, midPos = -1;
  int numOfRows = 0;
5
54liuyao 已提交
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792

  if (num <= 0) return -1;
  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller or equal than the key
    while (1) {
      if (key >= getValuefn(keyList, lastPos)) return lastPos;
      if (key == getValuefn(keyList, firstPos)) return firstPos;
      if (key < getValuefn(keyList, firstPos)) return firstPos - 1;

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < getValuefn(keyList, midPos)) {
        lastPos = midPos - 1;
      } else if (key > getValuefn(keyList, midPos)) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger or equal than the key
    while (1) {
      if (key <= getValuefn(keyList, firstPos)) return firstPos;
      if (key == getValuefn(keyList, lastPos)) return lastPos;

      if (key > getValuefn(keyList, lastPos)) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < getValuefn(keyList, midPos)) {
        lastPos = midPos - 1;
      } else if (key > getValuefn(keyList, midPos)) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
793 794
  }

5
54liuyao 已提交
795 796 797
  return midPos;
}

5
54liuyao 已提交
798
int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
L
Liu Jicong 已提交
799
  SArray*          res = (SArray*)data;
5
54liuyao 已提交
800
  SPullWindowInfo* pos = taosArrayGet(res, index);
L
Liu Jicong 已提交
801
  SPullWindowInfo* pData = (SPullWindowInfo*)pKey;
5
54liuyao 已提交
802
  if (pData->groupId > pos->groupId) {
5
54liuyao 已提交
803
    return 1;
5
54liuyao 已提交
804 805 806 807 808 809 810 811
  } else if (pData->groupId < pos->groupId) {
    return -1;
  }

  if (pData->window.skey > pos->window.ekey) {
    return 1;
  } else if (pData->window.ekey < pos->window.skey) {
    return -1;
5
54liuyao 已提交
812
  }
5
54liuyao 已提交
813
  return 0;
5
54liuyao 已提交
814 815 816 817 818 819 820 821
}

static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
  int32_t size = taosArrayGetSize(pPullWins);
  int32_t index = binarySearchCom(pPullWins, size, pPullInfo, TSDB_ORDER_DESC, comparePullWinKey);
  if (index == -1) {
    index = 0;
  } else {
5
54liuyao 已提交
822 823
    int32_t code = comparePullWinKey(pPullInfo, pPullWins, index);
    if (code == 0) {
L
Liu Jicong 已提交
824
      SPullWindowInfo* pos = taosArrayGet(pPullWins, index);
5
54liuyao 已提交
825 826 827 828
      pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey);
      pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey);
      pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey);
      pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey);
5
54liuyao 已提交
829
      return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
830
    } else if (code > 0) {
5
54liuyao 已提交
831
      index++;
5
54liuyao 已提交
832 833 834 835 836 837 838 839
    }
  }
  if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
840 841 842
static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
  winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
  return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
5
54liuyao 已提交
843 844
}

5
54liuyao 已提交
845 846 847 848 849 850 851 852
static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) {
  SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
  if (newPos == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  newPos->groupId = groupId;
  newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
  *(int64_t*)newPos->key = ts;
H
Haojun Liao 已提交
853 854
  SWinKey key = {.ts = ts, .groupId = groupId};
  if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
5
54liuyao 已提交
855 856 857
    taosMemoryFree(newPos);
  }
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
858 859
}

5
54liuyao 已提交
860 861 862 863
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SHashObj* pUpdatedMap) {
  return saveWinResult(ts, -1, -1, groupId, pUpdatedMap);
}

5
54liuyao 已提交
864
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
5
54liuyao 已提交
865 866
  int32_t size = taosArrayGetSize(pWins);
  for (int32_t i = 0; i < size; i++) {
H
Haojun Liao 已提交
867
    SWinKey* pW = taosArrayGet(pWins, i);
5
54liuyao 已提交
868 869 870 871 872 873
    void*    tmp = taosHashGet(pUpdatedMap, pW, sizeof(SWinKey));
    if (tmp) {
      void* value = *(void**)tmp;
      taosMemoryFree(value);
      taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
    }
5
54liuyao 已提交
874 875 876
  }
}

5
54liuyao 已提交
877 878
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
  SArray*     res = (SArray*)data;
5
54liuyao 已提交
879 880 881
  SWinKey*    pDataPos = taosArrayGet(res, index);
  SResKeyPos* pRKey = (SResKeyPos*)pKey;
  if (pRKey->groupId > pDataPos->groupId) {
5
54liuyao 已提交
882
    return 1;
5
54liuyao 已提交
883 884
  } else if (pRKey->groupId < pDataPos->groupId) {
    return -1;
5
54liuyao 已提交
885
  }
L
Liu Jicong 已提交
886

5
54liuyao 已提交
887 888
  if (*(int64_t*)pRKey->key > pDataPos->ts) {
    return 1;
L
Liu Jicong 已提交
889
  } else if (*(int64_t*)pRKey->key < pDataPos->ts) {
5
54liuyao 已提交
890 891 892
    return -1;
  }
  return 0;
5
54liuyao 已提交
893 894
}

5
54liuyao 已提交
895
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
5
54liuyao 已提交
896 897
  taosArraySort(pDelWins, winKeyCmprImpl);
  taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
L
Liu Jicong 已提交
898 899
  int32_t delSize = taosArrayGetSize(pDelWins);
  if (taosHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
5
54liuyao 已提交
900
    return;
dengyihao's avatar
dengyihao 已提交
901
  }
L
Liu Jicong 已提交
902
  void* pIte = NULL;
5
54liuyao 已提交
903
  while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
904
    SResKeyPos* pResKey = *(SResKeyPos**)pIte;
5
54liuyao 已提交
905 906
    int32_t     index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes);
    if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) {
907
      taosArrayRemove(pDelWins, index);
908
      delSize = taosArrayGetSize(pDelWins);
909 910 911 912
    }
  }
}

5
54liuyao 已提交
913
bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
5
54liuyao 已提交
914
  ASSERTS(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0, "maxts should greater than 0");
5
54liuyao 已提交
915
  return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
5
54liuyao 已提交
916 917
}

5
54liuyao 已提交
918 919 920 921 922
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { return isOverdue(pWin->ekey, pTwSup); }

bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
  return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark;
}
5
54liuyao 已提交
923

5
54liuyao 已提交
924
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
925
                            int32_t scanFlag) {
926
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
927

928
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
929
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
930

X
Xiaoyu Wang 已提交
931
  int32_t     startPos = 0;
932
  int32_t     numOfOutput = pSup->numOfExprs;
X
Xiaoyu Wang 已提交
933
  int64_t*    tsCols = extractTsCol(pBlock, pInfo);
H
Haojun Liao 已提交
934
  uint64_t    tableGroupId = pBlock->info.id.groupId;
935
  bool        ascScan = (pInfo->inputOrder == TSDB_ORDER_ASC);
X
Xiaoyu Wang 已提交
936 937
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
  SResultRow* pResult = NULL;
938

939 940
  STimeWindow win =
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
941 942
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
943
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
944
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
945
  }
X
Xiaoyu Wang 已提交
946 947
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
  int32_t forwardRows =
948
      getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
949
  ASSERT(forwardRows > 0);
950 951

  // prev time window not interpolation yet.
952
  if (pInfo->timeWindowInterpo) {
953
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
954
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
955 956

    // restore current time window
957 958
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
959
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
960
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
961 962
    }

963
    // window start key interpolation
964
    doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
965
  }
966

967
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
L
Liu Jicong 已提交
968 969
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
                                  pBlock->info.rows, numOfOutput);
970 971

  doCloseWindow(pResultRowInfo, pInfo, pResult);
972 973 974

  STimeWindow nextWin = win;
  while (1) {
975
    int32_t prevEndPos = forwardRows - 1 + startPos;
976
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->inputOrder);
977 978 979 980
    if (startPos < 0) {
      break;
    }
    // null data, failed to allocate more memory buffer
X
Xiaoyu Wang 已提交
981
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
982
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
983
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
984
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
985 986
    }

X
Xiaoyu Wang 已提交
987
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
988
    forwardRows =
989
        getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
990
    // window start(end) key interpolation
991
    doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
L
Liu Jicong 已提交
992
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
S
shenglian zhou 已提交
993
#if 0
994 995 996 997
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
        (!ascScan && ekey >= pBlock->info.window.skey)) {
      // window start(end) key interpolation
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
998
    } else if (pInfo->timeWindowInterpo) {
999 1000
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
    }
S
shenglian zhou 已提交
1001
#endif
1002
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
L
Liu Jicong 已提交
1003 1004
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
                                    pBlock->info.rows, numOfOutput);
1005
    doCloseWindow(pResultRowInfo, pInfo, pResult);
1006 1007 1008
  }

  if (pInfo->timeWindowInterpo) {
1009
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
1010
  }
1011 1012 1013 1014 1015 1016
}

void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
  // current result is done in computing final results.
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
    closeResultRow(pResult);
1017
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
D
dapan1121 已提交
1018
    taosMemoryFree(pNode);
1019 1020 1021
  }
}

1022 1023 1024 1025 1026
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) {
  SOpenWindowInfo openWin = {0};
  openWin.pos.pageId = pResult->pageId;
  openWin.pos.offset = pResult->offset;
  openWin.groupId = groupId;
L
Liu Jicong 已提交
1027
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
1028
  if (pn == NULL) {
1029 1030
    tdListAppend(pResultRowInfo->openWindow, &openWin);
    return openWin.pos;
1031 1032
  }

L
Liu Jicong 已提交
1033
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
1034 1035
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
    tdListAppend(pResultRowInfo->openWindow, &openWin);
1036 1037
  }

1038
  return openWin.pos;
1039 1040 1041 1042
}

int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
  TSKEY* tsCols = NULL;
1043

D
dapan1121 已提交
1044
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
1045 1046
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
    tsCols = (int64_t*)pColDataInfo->pData;
H
Haojun Liao 已提交
1047
    ASSERT(tsCols[0] != 0);
1048

1049 1050 1051 1052 1053 1054
    // no data in primary ts
    if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) {
      return NULL;
    }

    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
1055 1056 1057 1058 1059
      blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
    }
  }

  return tsCols;
1060 1061 1062 1063 1064 1065 1066
}

static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
  }

1067 1068 1069
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SOperatorInfo* downstream = pOperator->pDownstream[0];

1070
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1071
  SExprSupp*                pSup = &pOperator->exprSupp;
1072

1073
  int32_t scanFlag = MAIN_SCAN;
1074
  int64_t st = taosGetTimestampUs();
1075 1076

  while (1) {
1077
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1078 1079 1080 1081
    if (pBlock == NULL) {
      break;
    }

1082
    getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag);
1083

1084
    if (pInfo->scalarSupp.pExprInfo != NULL) {
L
Liu Jicong 已提交
1085 1086
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
1087 1088
    }

1089
    // the pDataBlock are always the same one, no need to call this again
1090
    setInputDataBlock(pSup, pBlock, pInfo->inputOrder, scanFlag, true);
1091
    hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag);
1092 1093
  }

1094
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->resultTsOrder);
1095
  OPTR_SET_OPENED(pOperator);
1096 1097

  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1098 1099 1100
  return TSDB_CODE_SUCCESS;
}

1101 1102 1103 1104 1105
static bool compareVal(const char* v, const SStateKeys* pKey) {
  if (IS_VAR_DATA_TYPE(pKey->type)) {
    if (varDataLen(v) != varDataLen(pKey->pData)) {
      return false;
    } else {
D
dapan1121 已提交
1106
      return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
1107 1108 1109 1110 1111 1112
    }
  } else {
    return memcmp(pKey->pData, v, pKey->bytes) == 0;
  }
}

1113
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
L
Liu Jicong 已提交
1114
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1115
  SExprSupp*     pSup = &pOperator->exprSupp;
1116

1117
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
H
Haojun Liao 已提交
1118
  int64_t          gid = pBlock->info.id.groupId;
1119 1120

  bool    masterScan = true;
1121
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
1122 1123
  int16_t bytes = pStateColInfoData->info.bytes;

1124
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
1125 1126 1127 1128 1129
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;

  SWindowRowsSup* pRowSup = &pInfo->winSup;
  pRowSup->numOfRows = 0;

1130
  struct SColumnDataAgg* pAgg = NULL;
1131
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
X
Xiaoyu Wang 已提交
1132
    pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
1133
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
1134 1135 1136 1137 1138
      continue;
    }

    char* val = colDataGetData(pStateColInfoData, j);

1139
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
1140 1141 1142 1143 1144 1145 1146
      // todo extract method
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
        varDataCopy(pInfo->stateKey.pData, val);
      } else {
        memcpy(pInfo->stateKey.pData, val, bytes);
      }

1147 1148
      pInfo->hasKey = true;

1149 1150
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1151
    } else if (compareVal(val, &pInfo->stateKey)) {
1152
      doKeepTuple(pRowSup, tsList[j], gid);
1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
      if (j == 0 && pRowSup->startRowIndex != 0) {
        pRowSup->startRowIndex = 0;
      }
    } else {  // a new state window started
      SResultRow* pResult = NULL;

      // keep the time window for the closed time window.
      STimeWindow window = pRowSup->win;

      pRowSup->win.ekey = pRowSup->win.skey;
1163 1164
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1165
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1166
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1167 1168 1169
      }

      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
H
Haojun Liao 已提交
1170
      applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
L
Liu Jicong 已提交
1171
                                      pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1172 1173

      // here we start a new session window
1174 1175
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1176 1177 1178 1179 1180 1181 1182

      // todo extract method
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
        varDataCopy(pInfo->stateKey.pData, val);
      } else {
        memcpy(pInfo->stateKey.pData, val, bytes);
      }
1183 1184 1185 1186 1187
    }
  }

  SResultRow* pResult = NULL;
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
1188 1189
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1190
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1191
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1192 1193 1194
  }

  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
L
Liu Jicong 已提交
1195 1196
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
                                  pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1197 1198
}

H
Hongze Cheng 已提交
1199
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
1200 1201
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1202 1203 1204
  }

  SStateWindowOperatorInfo* pInfo = pOperator->info;
1205
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1206

1207 1208 1209
  SExprSupp* pSup = &pOperator->exprSupp;
  int32_t    order = TSDB_ORDER_ASC;
  int64_t    st = taosGetTimestampUs();
1210 1211 1212

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  while (1) {
1213
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1214 1215 1216 1217
    if (pBlock == NULL) {
      break;
    }

1218
    setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
1219 1220
    blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);

1221 1222 1223 1224 1225 1226 1227 1228 1229
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
    if (pInfo->scalarSup.pExprInfo != NULL) {
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
      }
    }

1230 1231 1232
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
  }

X
Xiaoyu Wang 已提交
1233
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1234
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1235 1236
  pOperator->status = OP_RES_TO_RETURN;

1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250
  return TSDB_CODE_SUCCESS;
}

static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SStateWindowOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;

  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1251
    setOperatorCompleted(pOperator);
1252 1253 1254
    return NULL;
  }

1255
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1256
  while (1) {
1257
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1258
    doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1259

1260
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1261
    if (!hasRemain) {
H
Haojun Liao 已提交
1262
      setOperatorCompleted(pOperator);
1263 1264
      break;
    }
1265

1266 1267 1268 1269
    if (pBInfo->pRes->info.rows > 0) {
      break;
    }
  }
1270

1271
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1272
  return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1273 1274
}

1275
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
1276
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1277
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1278 1279 1280 1281 1282 1283

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pBlock = pInfo->binfo.pRes;
1284 1285 1286 1287
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
    return NULL;
  }
1288

1289 1290 1291
  while (1) {
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
1292

1293 1294
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
    if (!hasRemain) {
H
Haojun Liao 已提交
1295
      setOperatorCompleted(pOperator);
1296
      break;
1297 1298
    }

1299 1300 1301
    if (pBlock->info.rows > 0) {
      break;
    }
1302
  }
1303 1304 1305 1306 1307

  size_t rows = pBlock->info.rows;
  pOperator->resultInfo.totalRows += rows;

  return (rows == 0) ? NULL : pBlock;
1308 1309
}

5
54liuyao 已提交
1310
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
L
Liu Jicong 已提交
1311
  for (int i = 0; i < num; i++) {
5
54liuyao 已提交
1312 1313
    if (type == STREAM_INVERT) {
      fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
L
Liu Jicong 已提交
1314
    } else if (type == STREAM_NORMAL) {
5
54liuyao 已提交
1315 1316 1317 1318
      fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
    }
  }
}
5
54liuyao 已提交
1319

5
54liuyao 已提交
1320
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
L
Liu Jicong 已提交
1321
  SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
1322 1323 1324
  if (NULL == pResult) {
    return;
  }
L
Liu Jicong 已提交
1325

1326
  SqlFunctionCtx* pCtx = pSup->pCtx;
5
54liuyao 已提交
1327
  for (int32_t i = 0; i < numOfOutput; ++i) {
1328
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337
    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }
    pResInfo->initialized = false;
    if (pCtx[i].functionId != -1) {
      pCtx[i].fpSet.init(&pCtx[i], pResInfo);
    }
  }
5
54liuyao 已提交
1338
  SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
1339 1340 1341
  if (NULL == bufPage) {
    return;
  }
5
54liuyao 已提交
1342 1343
  setBufPageDirty(bufPage, true);
  releaseBufPage(pResultBuf, bufPage);
5
54liuyao 已提交
1344 1345
}

1346
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) {
5
54liuyao 已提交
1347 1348 1349
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SWinKey                      key = {.ts = ts, .groupId = groupId};
  tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
1350
  streamStateDel(pInfo->pState, &key);
5
54liuyao 已提交
1351 1352 1353
  return true;
}

1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
                            SHashObj* pUpdatedMap) {
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SColumnInfoData*             pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*                       startTsCols = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData*             pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*                       endTsCols = (TSKEY*)pEndTsCol->pData;
  SColumnInfoData*             pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*                       calStTsCols = (TSKEY*)pCalStTsCol->pData;
  SColumnInfoData*             pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*                       calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
  SColumnInfoData*             pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*                    pGpDatas = (uint64_t*)pGpCol->pData;
5
54liuyao 已提交
1367
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
H
Haojun Liao 已提交
1368
    SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1369
    dumyInfo.cur.pageId = -1;
H
Haojun Liao 已提交
1370

1371 1372 1373 1374 1375 1376 1377 1378
    STimeWindow win = {0};
    if (IS_FINAL_OP(pInfo)) {
      win.skey = startTsCols[i];
      win.ekey = endTsCols[i];
    } else {
      win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
    }

5
54liuyao 已提交
1379
    do {
1380 1381 1382 1383
      if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) {
        getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
        continue;
      }
5
54liuyao 已提交
1384
      uint64_t winGpId = pGpDatas[i];
1385
      bool     res = doDeleteWindow(pOperator, win.skey, winGpId);
5
54liuyao 已提交
1386 1387 1388 1389 1390
      SWinKey  winRes = {.ts = win.skey, .groupId = winGpId};
      if (pUpWins && res) {
        taosArrayPush(pUpWins, &winRes);
      }
      if (pUpdatedMap) {
5
54liuyao 已提交
1391 1392 1393 1394 1395 1396
        void* tmp = taosHashGet(pUpdatedMap, &winRes, sizeof(SWinKey));
        if (tmp) {
          void* value = *(void**)tmp;
          taosMemoryFree(value);
          taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
        }
5
54liuyao 已提交
1397 1398
      }
      getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
5
54liuyao 已提交
1399
    } while (win.ekey <= endTsCols[i]);
5
54liuyao 已提交
1400 1401 1402
  }
}

1403 1404 1405 1406 1407
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
  void*   pIte = NULL;
  size_t  keyLen = 0;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
L
Liu Jicong 已提交
1408 1409
    void*               key = tSimpleHashGetKey(pIte, &keyLen);
    uint64_t            groupId = *(uint64_t*)key;
1410
    TSKEY               ts = *(int64_t*)((char*)key + sizeof(uint64_t));
5
54liuyao 已提交
1411
    SResultRowPosition* pPos = (SResultRowPosition*)pIte;
5
54liuyao 已提交
1412
    int32_t             code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
5
54liuyao 已提交
1413 1414 1415 1416 1417 1418 1419
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

1420 1421
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
  SArray*  res = (SArray*)data;
5
54liuyao 已提交
1422 1423 1424 1425
  SWinKey* pDataPos = taosArrayGet(res, index);
  SWinKey* pWKey = (SWinKey*)pKey;

  if (pWKey->groupId > pDataPos->groupId) {
1426
    return 1;
5
54liuyao 已提交
1427 1428
  } else if (pWKey->groupId < pDataPos->groupId) {
    return -1;
1429
  }
5
54liuyao 已提交
1430 1431 1432 1433 1434 1435 1436

  if (pWKey->ts > pDataPos->ts) {
    return 1;
  } else if (pWKey->ts < pDataPos->ts) {
    return -1;
  }
  return 0;
1437 1438
}

5
54liuyao 已提交
1439
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
1440 1441
                                         SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pDelWins,
                                         SOperatorInfo* pOperator) {
5
54liuyao 已提交
1442
  qDebug("===stream===close interval window");
1443 1444 1445 1446
  void*                        pIte = NULL;
  size_t                       keyLen = 0;
  int32_t                      iter = 0;
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
1447
  int32_t                      delSize = taosArrayGetSize(pDelWins);
5
54liuyao 已提交
1448
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
1449 1450 1451 1452 1453 1454 1455 1456 1457 1458
    void*    key = tSimpleHashGetKey(pIte, &keyLen);
    SWinKey* pWinKey = (SWinKey*)key;
    if (delSize > 0) {
      int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
      if (index >= 0 && 0 == compareWinKey(pWinKey, pDelWins, index)) {
        taosArrayRemove(pDelWins, index);
        delSize = taosArrayGetSize(pDelWins);
      }
    }

5
54liuyao 已提交
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
    void*       chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
    STimeWindow win = {
        .skey = pWinKey->ts,
        .ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1,
    };
    if (isCloseWindow(&win, pTwSup)) {
      if (chIds && pPullDataMap) {
        SArray* chAy = *(SArray**)chIds;
        int32_t size = taosArrayGetSize(chAy);
        qDebug("===stream===window %" PRId64 " wait child size:%d", pWinKey->ts, size);
        for (int32_t i = 0; i < size; i++) {
          qDebug("===stream===window %" PRId64 " wait child id:%d", pWinKey->ts, *(int32_t*)taosArrayGet(chAy, i));
        }
        continue;
      } else if (pPullDataMap) {
        qDebug("===stream===close window %" PRId64, pWinKey->ts);
      }

      if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
        int32_t code = saveWinResultInfo(pWinKey->ts, pWinKey->groupId, closeWins);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528
    }
  }
  return TSDB_CODE_SUCCESS;
}

STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
  STimeWindow w = {.skey = ts, .ekey = INT64_MAX};
  w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
  return w;
}

static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, TSKEY mark, SInterval* pInterval,
                                  SWinKey* key) {
  STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
  SWinKey     next = {0};
  while (tw.ekey < mark) {
    SStreamStateCur* pCur = streamStateSeekKeyNext(pState, key);
    int32_t          code = streamStateGetKVByCur(pCur, &next, NULL, 0);
    streamStateFreeCur(pCur);

    void* chIds = taosHashGet(pPullDataMap, key, sizeof(SWinKey));
    if (chIds && pPullDataMap) {
      SArray* chAy = *(SArray**)chIds;
      int32_t size = taosArrayGetSize(chAy);
      qDebug("===stream===window %" PRId64 " wait child size:%d", key->ts, size);
      for (int32_t i = 0; i < size; i++) {
        qDebug("===stream===window %" PRId64 " wait child id:%d", key->ts, *(int32_t*)taosArrayGet(chAy, i));
      }
      break;
    }
    qDebug("===stream===delete window %" PRId64, key->ts);
    int32_t codeDel = streamStateDel(pState, key);
    if (codeDel != TSDB_CODE_SUCCESS) {
      code = streamStateGetFirst(pState, key);
      if (code != TSDB_CODE_SUCCESS) {
        qDebug("===stream===stream state first key: empty-empty");
        return;
      }
      continue;
    }
    if (code == TSDB_CODE_SUCCESS) {
      *key = next;
      tw = getFinalTimeWindow(key->ts, pInterval);
    }
  }
5
54liuyao 已提交
1529

5
54liuyao 已提交
1530 1531
  // for debug
  if (qDebugFlag & DEBUG_DEBUG && mark > 0) {
1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544
    SStreamStateCur* pCur = streamStateGetCur(pState, key);
    int32_t          code = streamStateCurPrev(pState, pCur);
    if (code == TSDB_CODE_SUCCESS) {
      SWinKey tmpKey = {0};
      code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
      if (code == TSDB_CODE_SUCCESS) {
        STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
        qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey,
               tw.ekey, tmpKey.groupId, mark);
      } else {
        STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
        qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
               key->groupId, mark);
5
54liuyao 已提交
1545
      }
1546 1547 1548 1549
    } else {
      STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
      qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRId64 ",mark %" PRId64, tw.skey, tw.ekey,
             key->groupId, mark);
5
54liuyao 已提交
1550
    }
1551
    streamStateFreeCur(pCur);
5
54liuyao 已提交
1552 1553 1554
  }
}

1555
static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) {
5
54liuyao 已提交
1556 1557
  int32_t size = taosArrayGetSize(pChildren);
  for (int32_t i = 0; i < size; i++) {
1558 1559
    SOperatorInfo*               pChildOp = taosArrayGetP(pChildren, i);
    SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
5
54liuyao 已提交
1560
    ASSERTS(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE, "children trigger type should be at once");
5
54liuyao 已提交
1561
    pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
1562
    closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
1563
                              NULL, pOperator);
1564 1565 1566
  }
}

1567 1568
static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index,
                                SSDataBlock* pBlock) {
1569 1570 1571 1572 1573 1574 1575 1576
  blockDataCleanup(pBlock);
  int32_t size = taosArrayGetSize(pWins);
  if (*index == size) {
    *index = 0;
    taosArrayClear(pWins);
    return;
  }
  blockDataEnsureCapacity(pBlock, size - *index);
1577
  uint64_t uid = 0;
1578
  for (int32_t i = *index; i < size; i++) {
H
Haojun Liao 已提交
1579
    SWinKey* pWin = taosArrayGet(pWins, i);
1580 1581
    void*    tbname = NULL;
    streamStateGetParName(pInfo->pState, pWin->groupId, &tbname);
1582 1583 1584 1585 1586 1587 1588
    if (tbname == NULL) {
      appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
    } else {
      char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
      STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
      appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
    }
1589
    tdbFree(tbname);
1590
    (*index)++;
5
54liuyao 已提交
1591 1592 1593
  }
}

1594
static void destroyStateWindowOperatorInfo(void* param) {
1595
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1596
  cleanupBasicInfo(&pInfo->binfo);
1597
  taosMemoryFreeClear(pInfo->stateKey.pData);
1598
  cleanupExprSupp(&pInfo->scalarSup);
D
dapan1121 已提交
1599 1600 1601
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  cleanupAggSup(&pInfo->aggSup);
  cleanupGroupResInfo(&pInfo->groupResInfo);
1602

D
dapan1121 已提交
1603
  taosMemoryFreeClear(param);
1604 1605
}

H
Haojun Liao 已提交
1606
static void freeItem(void* param) {
L
Liu Jicong 已提交
1607
  SGroupKeys* pKey = (SGroupKeys*)param;
H
Haojun Liao 已提交
1608 1609 1610
  taosMemoryFree(pKey->pData);
}

1611
void destroyIntervalOperatorInfo(void* param) {
1612
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
1613
  cleanupBasicInfo(&pInfo->binfo);
1614
  cleanupAggSup(&pInfo->aggSup);
1615 1616 1617 1618
  cleanupExprSupp(&pInfo->scalarSupp);

  tdListFree(pInfo->binfo.resultRowInfo.openWindow);

H
Haojun Liao 已提交
1619 1620 1621 1622
  pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols);
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);

  pInfo->pPrevValues = NULL;
1623

H
Haojun Liao 已提交
1624 1625
  cleanupGroupResInfo(&pInfo->groupResInfo);
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
D
dapan1121 已提交
1626
  taosMemoryFreeClear(param);
1627 1628
}

1629
void destroyStreamFinalIntervalOperatorInfo(void* param) {
1630
  SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param;
1631
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
1632
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
1633
  // it should be empty.
5
54liuyao 已提交
1634 1635 1636
  taosHashCleanup(pInfo->pPullDataMap);
  taosArrayDestroy(pInfo->pPullWins);
  blockDataDestroy(pInfo->pPullDataRes);
L
Liu Jicong 已提交
1637 1638
  taosArrayDestroy(pInfo->pDelWins);
  blockDataDestroy(pInfo->pDelRes);
1639
  taosMemoryFreeClear(pInfo->pState);
5
54liuyao 已提交
1640

1641 1642 1643 1644
  if (pInfo->pChildren) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
      SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
5
54liuyao 已提交
1645
      destroyOperatorInfo(pChildOp);
1646
    }
L
Liu Jicong 已提交
1647
    taosArrayDestroy(pInfo->pChildren);
1648
  }
1649
  nodesDestroyNode((SNode*)pInfo->pPhyNode);
5
54liuyao 已提交
1650
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
5
54liuyao 已提交
1651
  cleanupGroupResInfo(&pInfo->groupResInfo);
5
54liuyao 已提交
1652
  cleanupExprSupp(&pInfo->scalarSupp);
1653

D
dapan1121 已提交
1654
  taosMemoryFreeClear(param);
5
54liuyao 已提交
1655 1656
}

1657
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
5
54liuyao 已提交
1658
  for (int32_t i = 0; i < numOfCols; i++) {
5
54liuyao 已提交
1659
    if (fmIsUserDefinedFunc(pFCtx[i].functionId) || !fmIsInvertible(pFCtx[i].functionId)) {
5
54liuyao 已提交
1660 1661 1662 1663 1664 1665
      return false;
    }
  }
  return true;
}

1666
static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo) {
1667 1668 1669
  // the primary timestamp column
  bool needed = false;

L
Liu Jicong 已提交
1670
  for (int32_t i = 0; i < numOfCols; ++i) {
1671
    SExprInfo* pExpr = pCtx[i].pExpr;
H
Haojun Liao 已提交
1672
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
1673
      needed = true;
H
Haojun Liao 已提交
1674
      break;
1675 1676 1677
    }
  }

H
Haojun Liao 已提交
1678 1679 1680
  if (needed) {
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
1681

H
Haojun Liao 已提交
1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696
    {  // ts column
      SColumn c = {0};
      c.colId = 1;
      c.slotId = pInfo->primaryTsIndex;
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
      c.bytes = sizeof(int64_t);
      taosArrayPush(pInfo->pInterpCols, &c);

      SGroupKeys key;
      key.bytes = c.bytes;
      key.type = c.type;
      key.isNull = true;  // to denote no value is assigned yet
      key.pData = taosMemoryCalloc(1, c.bytes);
      taosArrayPush(pInfo->pPrevValues, &key);
    }
1697 1698
  }

X
Xiaoyu Wang 已提交
1699
  for (int32_t i = 0; i < numOfCols; ++i) {
1700 1701
    SExprInfo* pExpr = pCtx[i].pExpr;

H
Haojun Liao 已提交
1702
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
1703 1704 1705
      SFunctParam* pParam = &pExpr->base.pParam[0];

      SColumn c = *pParam->pCol;
1706
      taosArrayPush(pInfo->pInterpCols, &c);
1707 1708

      SGroupKeys key = {0};
X
Xiaoyu Wang 已提交
1709 1710
      key.bytes = c.bytes;
      key.type = c.type;
1711
      key.isNull = false;
X
Xiaoyu Wang 已提交
1712
      key.pData = taosMemoryCalloc(1, c.bytes);
1713
      taosArrayPush(pInfo->pPrevValues, &key);
1714 1715 1716 1717 1718 1719
    }
  }

  return needed;
}

L
Liu Jicong 已提交
1720
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval,
5
54liuyao 已提交
1721
                            STimeWindowAggSupp* pTwSup) {
1722
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
5
54liuyao 已提交
1723
    initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, pTwSup);
1724 1725
    return;
  }
5
54liuyao 已提交
1726
  SStreamScanInfo* pScanInfo = downstream->info;
1727 1728
  pScanInfo->windowSup.parentType = type;
  pScanInfo->windowSup.pIntervalAggSup = pSup;
5
54liuyao 已提交
1729 1730 1731
  if (!pScanInfo->pUpdateInfo) {
    pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
  }
1732
  pScanInfo->interval = *pInterval;
5
54liuyao 已提交
1733
  pScanInfo->twAggSup = *pTwSup;
5
54liuyao 已提交
1734 1735
}

H
Haojun Liao 已提交
1736 1737
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t i = 0; i < numOfExpr; i++) {
L
Liu Jicong 已提交
1738
    //    pCtx[i].isStream = true;
H
Haojun Liao 已提交
1739 1740 1741
  }
}

H
Haojun Liao 已提交
1742
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode,
L
Liu Jicong 已提交
1743
                                          SExecTaskInfo* pTaskInfo, bool isStream) {
1744
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
L
Liu Jicong 已提交
1745
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1746 1747 1748 1749
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
1750
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1751 1752 1753 1754 1755 1756
  initBasicInfo(&pInfo->binfo, pResBlock);

  SExprSupp* pSup = &pOperator->exprSupp;
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
H
Haojun Liao 已提交
1757 1758
  initResultSizeInfo(&pOperator->resultInfo, 512);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1759 1760 1761

  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
L
Liu Jicong 已提交
1762 1763
  int32_t    code =
      initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState);
H
Haojun Liao 已提交
1764 1765 1766 1767 1768
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  SInterval interval = {.interval = pPhyNode->interval,
1769 1770 1771 1772 1773
                        .sliding = pPhyNode->sliding,
                        .intervalUnit = pPhyNode->intervalUnit,
                        .slidingUnit = pPhyNode->slidingUnit,
                        .offset = pPhyNode->offset,
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
1774 1775 1776 1777 1778 1779 1780

  STimeWindowAggSupp as = {
      .waterMark = pPhyNode->window.watermark,
      .calTrigger = pPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
  };

L
Liu Jicong 已提交
1781
  pInfo->win = pTaskInfo->window;
1782 1783
  pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
  pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
H
Haojun Liao 已提交
1784 1785
  pInfo->interval = interval;
  pInfo->twAggSup = as;
1786
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1787 1788 1789 1790

  if (pPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar);
H
Haojun Liao 已提交
1791
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
1792 1793 1794 1795
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }
1796

H
Haojun Liao 已提交
1797 1798 1799 1800 1801
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

1802
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
H
Haojun Liao 已提交
1803
  pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo);
1804
  if (pInfo->timeWindowInterpo) {
1805
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
H
Haojun Liao 已提交
1806 1807 1808
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
      goto _error;
    }
1809
  }
1810

1811
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
1812 1813
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1814

L
Liu Jicong 已提交
1815 1816
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo,
                                         optrDefaultBufFn, NULL);
1817 1818 1819 1820 1821 1822 1823 1824

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

L
Liu Jicong 已提交
1825
_error:
H
Haojun Liao 已提交
1826 1827 1828
  if (pInfo != NULL) {
    destroyIntervalOperatorInfo(pInfo);
  }
1829 1830 1831 1832 1833
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

1834
// todo handle multiple timeline cases. assume no timeline interweaving
1835 1836
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1837
  SExprSupp*     pSup = &pOperator->exprSupp;
1838

1839
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
1840 1841

  bool    masterScan = true;
1842
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
1843
  int64_t gid = pBlock->info.id.groupId;
1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857

  int64_t gap = pInfo->gap;

  if (!pInfo->reptScan) {
    pInfo->reptScan = true;
    pInfo->winSup.prevTs = INT64_MIN;
  }

  SWindowRowsSup* pRowSup = &pInfo->winSup;
  pRowSup->numOfRows = 0;

  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
1858 1859 1860
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
H
Haojun Liao 已提交
1861 1862
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
1863
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1864
      doKeepTuple(pRowSup, tsList[j], gid);
1865 1866 1867 1868 1869 1870 1871 1872 1873 1874
      if (j == 0 && pRowSup->startRowIndex != 0) {
        pRowSup->startRowIndex = 0;
      }
    } else {  // start a new session window
      SResultRow* pResult = NULL;

      // keep the time window for the closed time window.
      STimeWindow window = pRowSup->win;

      pRowSup->win.ekey = pRowSup->win.skey;
1875 1876
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1877
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1878
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1879 1880 1881 1882
      }

      // pInfo->numOfRows data belong to the current session window
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
H
Haojun Liao 已提交
1883
      applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
L
Liu Jicong 已提交
1884
                                      pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1885 1886

      // here we start a new session window
1887 1888
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1889 1890 1891 1892 1893
    }
  }

  SResultRow* pResult = NULL;
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
1894 1895
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1896
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1897
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1898 1899 1900
  }

  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
L
Liu Jicong 已提交
1901 1902
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
                                  pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1903 1904
}

1905
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
1906 1907 1908 1909 1910 1911
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSessionAggOperatorInfo* pInfo = pOperator->info;
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
1912
  SExprSupp*               pSup = &pOperator->exprSupp;
1913 1914

  if (pOperator->status == OP_RES_TO_RETURN) {
1915
    while (1) {
1916
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1917
      doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1918

1919
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1920
      if (!hasRemain) {
H
Haojun Liao 已提交
1921
        setOperatorCompleted(pOperator);
1922 1923
        break;
      }
1924

1925 1926 1927 1928 1929
      if (pBInfo->pRes->info.rows > 0) {
        break;
      }
    }
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1930
    return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1931 1932
  }

1933 1934 1935
  int64_t st = taosGetTimestampUs();
  int32_t order = TSDB_ORDER_ASC;

1936 1937 1938
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
1939
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1940 1941 1942 1943 1944
    if (pBlock == NULL) {
      break;
    }

    // the pDataBlock are always the same one, no need to call this again
1945
    setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
1946 1947
    blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);

1948 1949 1950
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
  }

1951 1952
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

1953 1954 1955
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;

1956
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1957
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1958
  while (1) {
1959
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1960
    doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1961

1962
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1963
    if (!hasRemain) {
H
Haojun Liao 已提交
1964
      setOperatorCompleted(pOperator);
1965 1966
      break;
    }
1967

1968 1969 1970 1971 1972
    if (pBInfo->pRes->info.rows > 0) {
      break;
    }
  }
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1973
  return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1974 1975
}

1976 1977
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
                                             SExecTaskInfo* pTaskInfo) {
1978 1979 1980 1981 1982 1983
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

1984 1985 1986
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
  SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;

1987 1988 1989
  if (pStateNode->window.pExprs != NULL) {
    int32_t    numOfScalarExpr = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
H
Hongze Cheng 已提交
1990
    int32_t    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
1991 1992 1993 1994 1995
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

1996
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
1997 1998 1999 2000 2001 2002 2003
  pInfo->stateKey.type = pInfo->stateCol.type;
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
  if (pInfo->stateKey.pData == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
2004 2005 2006 2007 2008
  int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2009 2010
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

2011 2012
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
2013
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
2014

L
Liu Jicong 已提交
2015 2016
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
2017 2018 2019 2020
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2021
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
2022
  initBasicInfo(&pInfo->binfo, pResBlock);
2023
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2024

L
Liu Jicong 已提交
2025 2026
  pInfo->twAggSup =
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
2027

2028 2029
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

X
Xiaoyu Wang 已提交
2030
  pInfo->tsSlotId = tsSlotId;
2031

L
Liu Jicong 已提交
2032 2033
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
L
Liu Jicong 已提交
2034 2035
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo,
                                         optrDefaultBufFn, NULL);
2036

2037 2038 2039 2040 2041
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2042 2043
  return pOperator;

L
Liu Jicong 已提交
2044
_error:
H
Haojun Liao 已提交
2045 2046 2047 2048
  if (pInfo != NULL) {
    destroyStateWindowOperatorInfo(pInfo);
  }

2049 2050
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
2051 2052 2053
  return NULL;
}

2054
void destroySWindowOperatorInfo(void* param) {
2055
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
2056 2057 2058
  if (pInfo == NULL) {
    return;
  }
2059

2060
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
2061 2062 2063 2064
  colDataDestroy(&pInfo->twAggSup.timeWindowData);

  cleanupAggSup(&pInfo->aggSup);
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
2065
  taosMemoryFreeClear(param);
2066 2067
}

H
Haojun Liao 已提交
2068
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
2069
                                            SExecTaskInfo* pTaskInfo) {
2070 2071 2072 2073 2074 2075
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2076
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2077
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2078

2079
  int32_t      numOfCols = 0;
H
Haojun Liao 已提交
2080
  SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
2081
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
2082
  initBasicInfo(&pInfo->binfo, pResBlock);
H
Haojun Liao 已提交
2083

L
Liu Jicong 已提交
2084 2085
  int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
                            pTaskInfo->streamInfo.pState);
2086 2087 2088 2089
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2090 2091 2092 2093
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
  pInfo->gap = pSessionNode->gap;

2094
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2095 2096
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

2097
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
2098 2099 2100
  pInfo->binfo.pRes = pResBlock;
  pInfo->winSup.prevTs = INT64_MIN;
  pInfo->reptScan = false;
H
Haojun Liao 已提交
2101 2102 2103 2104
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2105

L
Liu Jicong 已提交
2106 2107
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
2108 2109
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo,
                                         optrDefaultBufFn, NULL);
2110 2111
  pOperator->pTaskInfo = pTaskInfo;
  code = appendDownstream(pOperator, &downstream, 1);
2112 2113 2114 2115
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2116 2117
  return pOperator;

L
Liu Jicong 已提交
2118
_error:
2119
  destroySWindowOperatorInfo(pInfo);
2120 2121 2122
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2123
}
5
54liuyao 已提交
2124

5
54liuyao 已提交
2125
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
2126
                      SExecTaskInfo* pTaskInfo, SColumnInfoData* pTimeWindowData) {
5
54liuyao 已提交
2127 2128
  for (int32_t k = 0; k < numOfOutput; ++k) {
    if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
2129 2130 2131 2132 2133
      if (!pTimeWindowData) {
        continue;
      }

      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pDestCtx[k]);
L
Liu Jicong 已提交
2134 2135
      char*                p = GET_ROWCELL_INTERBUF(pEntryInfo);
      SColumnInfoData      idata = {0};
2136 2137 2138 2139 2140 2141 2142 2143
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
      idata.pData = p;

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pDestCtx[k].sfp.process(&tw, 1, &out);
      pEntryInfo->numOfRes = 1;
L
Liu Jicong 已提交
2144
    } else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
2145
      int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
5
54liuyao 已提交
2146 2147 2148
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
        pTaskInfo->code = code;
2149
        T_LONG_JMP(pTaskInfo->env, code);
5
54liuyao 已提交
2150 2151 2152 2153 2154
      }
    }
  }
}

2155 2156
bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) {
  return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0);
2157 2158
}

5
54liuyao 已提交
2159
static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SHashObj* pUpdatedMap) {
2160 2161 2162 2163
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  int32_t                      size = taosArrayGetSize(pWinArray);
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
5
54liuyao 已提交
2164
  SExprSupp*                   pSup = &pOperator->exprSupp;
5
54liuyao 已提交
2165 2166 2167
  if (!pInfo->pChildren) {
    return;
  }
5
54liuyao 已提交
2168
  for (int32_t i = 0; i < size; i++) {
H
Haojun Liao 已提交
2169
    SWinKey*    pWinRes = taosArrayGet(pWinArray, i);
2170
    SResultRow* pCurResult = NULL;
2171
    STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval);
5
54liuyao 已提交
2172
    if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup)) {
2173 2174
      continue;
    }
2175

5
54liuyao 已提交
2176
    int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
2177
    int32_t num = 0;
5
54liuyao 已提交
2178
    for (int32_t j = 0; j < numOfChildren; j++) {
2179 2180 2181 2182
      SOperatorInfo*               pChildOp = taosArrayGetP(pInfo->pChildren, j);
      SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
      SExprSupp*                   pChildSup = &pChildOp->exprSupp;
      if (!hasIntervalWindow(pChInfo->pState, pWinRes)) {
2183 2184
        continue;
      }
2185 2186 2187 2188
      if (num == 0) {
        int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
                                    pSup->rowEntryInfoOffset, &pInfo->aggSup);
        if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
S
Shengliang Guan 已提交
2189
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
2190 2191
        }
      }
2192
      num++;
2193
      SResultRow* pChResult = NULL;
2194 2195
      setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
                   pChildSup->rowEntryInfoOffset, &pChInfo->aggSup);
2196 2197
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
      compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
5
54liuyao 已提交
2198
      releaseOutputBuf(pChInfo->pState, pWinRes, pChResult);
5
54liuyao 已提交
2199
    }
2200
    if (num > 0 && pUpdatedMap) {
2201 2202 2203
      saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pUpdatedMap);
      saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize);
      releaseOutputBuf(pInfo->pState, pWinRes, pCurResult);
2204
    }
5
54liuyao 已提交
2205 2206 2207 2208 2209
  }
}

bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
  SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
2210
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
L
Liu Jicong 已提交
2211
                                                               GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
5
54liuyao 已提交
2212 2213 2214
  return p1 == NULL;
}

2215
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
5
54liuyao 已提交
2216 2217
  if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
    SWinKey key = {.ts = pWin->skey, .groupId = groupId};
5
54liuyao 已提交
2218
    if (streamStateGet(pState, &key, NULL, 0) == TSDB_CODE_SUCCESS) {
5
54liuyao 已提交
2219 2220
      return false;
    }
2221
    return true;
5
54liuyao 已提交
2222 2223 2224 2225
  }
  return false;
}

L
Liu Jicong 已提交
2226 2227 2228 2229
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey,
                        STimeWindow* pNextWin) {
  int32_t forwardRows =
      getNumOfRowsInTimeWindow(pBlockInfo, tsCols, startPos, eKey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
5
54liuyao 已提交
2230 2231 2232 2233
  int32_t prevEndPos = forwardRows - 1 + startPos;
  return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
}

H
Haojun Liao 已提交
2234
void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
5
54liuyao 已提交
2235 2236 2237 2238
  SArray* childIds = taosArrayInit(8, sizeof(int32_t));
  for (int32_t i = 0; i < size; i++) {
    taosArrayPush(childIds, &i);
  }
H
Haojun Liao 已提交
2239
  taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*));
5
54liuyao 已提交
2240 2241 2242 2243
}

static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }

2244
static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) {
2245
  tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
5
54liuyao 已提交
2246
  clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
2247
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
2248
  pInfo->aggSup.currentPageId = -1;
2249
  streamStateClear(pInfo->pState);
5
54liuyao 已提交
2250 2251
}

5
54liuyao 已提交
2252 2253 2254 2255
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
  if (pBlock->info.rows <= 0) {
    return;
  }
5
54liuyao 已提交
2256 2257 2258
  blockDataCleanup(pBlock);
}

5
54liuyao 已提交
2259 2260 2261 2262 2263 2264
static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) {
  clearSpecialDataBlock(pBlock);
  int32_t size = taosArrayGetSize(array);
  if (size - (*pIndex) == 0) {
    return;
  }
L
Liu Jicong 已提交
2265
  blockDataEnsureCapacity(pBlock, size - (*pIndex));
2266 2267 2268 2269 2270
  SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
5
54liuyao 已提交
2271
  for (; (*pIndex) < size; (*pIndex)++) {
L
Liu Jicong 已提交
2272
    SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex));
5
54liuyao 已提交
2273 2274 2275
    colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
    colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
    colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
5
54liuyao 已提交
2276 2277
    colDataAppend(pCalStartTs, pBlock->info.rows, (const char*)&pWin->calWin.skey, false);
    colDataAppend(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false);
5
54liuyao 已提交
2278 2279 2280 2281 2282 2283 2284 2285 2286
    pBlock->info.rows++;
  }
  if ((*pIndex) == size) {
    *pIndex = 0;
    taosArrayClear(array);
  }
  blockDataUpdateTsWindow(pBlock, 0);
}

5
54liuyao 已提交
2287
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
5
54liuyao 已提交
2288
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
2289
  TSKEY*           tsData = (TSKEY*)pStartCol->pData;
5
54liuyao 已提交
2290 2291
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           tsEndData = (TSKEY*)pEndCol->pData;
5
54liuyao 已提交
2292
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
L
Liu Jicong 已提交
2293 2294
  uint64_t*        groupIdData = (uint64_t*)pGroupCol->pData;
  int32_t          chId = getChildIndex(pBlock);
5
54liuyao 已提交
2295
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
5
54liuyao 已提交
2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310
    TSKEY winTs = tsData[i];
    while (winTs < tsEndData[i]) {
      SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
      void*   chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
      if (chIds) {
        SArray* chArray = *(SArray**)chIds;
        int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
        if (index != -1) {
          qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
          taosArrayRemove(chArray, index);
          if (taosArrayGetSize(chArray) == 0) {
            // pull data is over
            taosArrayDestroy(chArray);
            taosHashRemove(pMap, &winRes, sizeof(SWinKey));
          }
5
54liuyao 已提交
2311 2312
        }
      }
5
54liuyao 已提交
2313
      winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
5
54liuyao 已提交
2314 2315 2316
    }
  }
}
5
54liuyao 已提交
2317

2318
static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
2319 2320
  int32_t size = taosArrayGetSize(wins);
  for (int32_t i = 0; i < size; i++) {
L
Liu Jicong 已提交
2321
    SWinKey*    winKey = taosArrayGet(wins, i);
2322
    STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval);
2323
    if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) {
2324 2325
      void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
      if (!chIds) {
L
Liu Jicong 已提交
2326 2327
        SPullWindowInfo pull = {
            .window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
2328
        // add pull data request
5
54liuyao 已提交
2329 2330 2331 2332 2333
        if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
          int32_t size1 = taosArrayGetSize(pInfo->pChildren);
          addPullWindow(pInfo->pPullDataMap, winKey, size1);
          qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
        }
2334 2335 2336 2337 2338
      }
    }
  }
}

5
54liuyao 已提交
2339 2340 2341 2342 2343 2344
static void clearFunctionContext(SExprSupp* pSup) {
  for (int32_t i = 0; i < pSup->numOfExprs; i++) {
    pSup->pCtx[i].saveHandle.currentPage = -1;
  }
}

2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355
void doBuildResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
2356
  pBlock->info.id.groupId = 0;
2357
  buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
2358 2359
}

5
54liuyao 已提交
2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370
static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
                                           TSKEY* primaryKeys, int32_t prevPosition) {
  int32_t startPos = prevPosition + 1;
  if (startPos == pDataBlockInfo->rows) {
    startPos = -1;
  } else {
    *pNext = getFinalTimeWindow(primaryKeys[startPos], pInterval);
  }
  return startPos;
}

2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
                                    SHashObj* pUpdatedMap) {
  SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;

  SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
  SExecTaskInfo*  pTaskInfo = pOperatorInfo->pTaskInfo;
  SExprSupp*      pSup = &pOperatorInfo->exprSupp;
  int32_t         numOfOutput = pSup->numOfExprs;
  int32_t         step = 1;
  TSKEY*          tsCols = NULL;
  SResultRow*     pResult = NULL;
  int32_t         forwardRows = 0;

  SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
  tsCols = (int64_t*)pColDataInfo->pData;

  int32_t     startPos = 0;
  TSKEY       ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
5
54liuyao 已提交
2389 2390 2391 2392 2393 2394
  STimeWindow nextWin = {0};
  if (IS_FINAL_OP(pInfo)) {
    nextWin = getFinalTimeWindow(ts, &pInfo->interval);
  } else {
    nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
  }
2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412
  while (1) {
    bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
    if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
      startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
      if (startPos < 0) {
        break;
      }
      continue;
    }

    if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) {
      bool    ignore = true;
      SWinKey winRes = {
          .ts = nextWin.skey,
          .groupId = groupId,
      };
      void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
      if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) {
L
Liu Jicong 已提交
2413 2414
        SPullWindowInfo pull = {
            .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
2415
        // add pull data request
5
54liuyao 已提交
2416 2417 2418 2419 2420
        if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
          int32_t size = taosArrayGetSize(pInfo->pChildren);
          addPullWindow(pInfo->pPullDataMap, &winRes, size);
          qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size);
        }
2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446
      } else {
        int32_t index = -1;
        SArray* chArray = NULL;
        int32_t chId = 0;
        if (chIds) {
          chArray = *(void**)chIds;
          chId = getChildIndex(pSDataBlock);
          index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
        }
        if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
          ignore = false;
        }
      }

      if (ignore) {
        startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
        if (startPos < 0) {
          break;
        }
        continue;
      }
    }

    int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput,
                                pSup->rowEntryInfoOffset, &pInfo->aggSup);
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
2447
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
2448 2449
    }

5
54liuyao 已提交
2450 2451 2452 2453 2454 2455
    if (IS_FINAL_OP(pInfo)) {
      forwardRows = 1;
    } else {
      forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
                                             NULL, TSDB_ORDER_ASC);
    }
2456 2457 2458
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
      saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap);
    }
5
54liuyao 已提交
2459 2460 2461 2462 2463 2464 2465 2466

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
      SWinKey key = {
          .ts = pResult->win.skey,
          .groupId = groupId,
      };
      tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
    }
2467
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
H
Haojun Liao 已提交
2468
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
L
Liu Jicong 已提交
2469
                                    pSDataBlock->info.rows, numOfOutput);
2470 2471 2472 2473 2474 2475 2476 2477 2478 2479
    SWinKey key = {
        .ts = nextWin.skey,
        .groupId = groupId,
    };
    saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize);
    releaseOutputBuf(pInfo->pState, &key, pResult);
    if (pInfo->delKey.ts > key.ts) {
      pInfo->delKey = key;
    }
    int32_t prevEndPos = (forwardRows - 1) * step + startPos;
2480
    ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
5
54liuyao 已提交
2481 2482 2483 2484 2485 2486
    if (IS_FINAL_OP(pInfo)) {
      startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos);
    } else {
      startPos =
          getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
    }
2487 2488 2489 2490 2491 2492
    if (startPos < 0) {
      break;
    }
  }
}

5
54liuyao 已提交
2493
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
2494
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2495
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
2496 2497 2498

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  TSKEY          maxTs = INT64_MIN;
5
54liuyao 已提交
2499
  TSKEY          minTs = INT64_MAX;
5
54liuyao 已提交
2500

2501 2502
  SExprSupp* pSup = &pOperator->exprSupp;

5
54liuyao 已提交
2503
  qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
L
Liu Jicong 已提交
2504

5
54liuyao 已提交
2505 2506 2507
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  } else if (pOperator->status == OP_RES_TO_RETURN) {
5
54liuyao 已提交
2508 2509 2510
    doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
    if (pInfo->pPullDataRes->info.rows != 0) {
      // process the rest of the data
5
54liuyao 已提交
2511
      printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2512 2513 2514
      return pInfo->pPullDataRes;
    }

2515
    doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
2516 2517 2518 2519 2520 2521
    if (pInfo->pDelRes->info.rows != 0) {
      // process the rest of the data
      printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
      return pInfo->pDelRes;
    }

2522
    doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2523 2524 2525
    if (pInfo->binfo.pRes->info.rows != 0) {
      printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
      return pInfo->binfo.pRes;
5
54liuyao 已提交
2526
    }
5
54liuyao 已提交
2527

H
Haojun Liao 已提交
2528
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
2529 2530 2531 2532 2533 2534
    if (!IS_FINAL_OP(pInfo)) {
      clearFunctionContext(&pOperator->exprSupp);
      // semi interval operator clear disk buffer
      clearStreamIntervalOperator(pInfo);
      qDebug("===stream===clear semi operator");
    } else {
2535 2536
      deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
                            &pInfo->interval, &pInfo->delKey);
L
Liu Jicong 已提交
2537
      // streamStateCommit(pTaskInfo->streamInfo.pState);
5
54liuyao 已提交
2538 2539
    }
    return NULL;
5
54liuyao 已提交
2540
  } else {
5
54liuyao 已提交
2541
    if (!IS_FINAL_OP(pInfo)) {
2542
      doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
2543 2544 2545 2546 2547 2548
      if (pInfo->pDelRes->info.rows != 0) {
        // process the rest of the data
        printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
        return pInfo->pDelRes;
      }

2549
      doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2550
      if (pInfo->binfo.pRes->info.rows != 0) {
5
54liuyao 已提交
2551
        printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2552 2553
        return pInfo->binfo.pRes;
      }
5
54liuyao 已提交
2554
    }
5
54liuyao 已提交
2555 2556
  }

5
54liuyao 已提交
2557 2558 2559
  SArray*    pUpdated = taosArrayInit(4, POINTER_BYTES);
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  SHashObj*  pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
2560 2561 2562
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
2563
      pOperator->status = OP_RES_TO_RETURN;
L
Liu Jicong 已提交
2564 2565
      qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
             IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
5
54liuyao 已提交
2566
      pInfo->numOfDatapack = 0;
5
54liuyao 已提交
2567 2568
      break;
    }
5
54liuyao 已提交
2569
    pInfo->numOfDatapack++;
5
54liuyao 已提交
2570
    printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
2571

H
Haojun Liao 已提交
2572
    if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
5
54liuyao 已提交
2573
      pInfo->binfo.pRes->info.type = pBlock->info.type;
2574 2575
    } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
               pBlock->info.type == STREAM_CLEAR) {
2576
      SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
2577
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pUpdatedMap);
2578
      if (IS_FINAL_OP(pInfo)) {
2579 2580 2581 2582
        int32_t                      childIndex = getChildIndex(pBlock);
        SOperatorInfo*               pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
        SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
        SExprSupp*                   pChildSup = &pChildOp->exprSupp;
2583
        doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL);
5
54liuyao 已提交
2584
        rebuildIntervalWindow(pOperator, delWins, pUpdatedMap);
2585 2586 2587
        addRetriveWindow(delWins, pInfo);
        taosArrayAddAll(pInfo->pDelWins, delWins);
        taosArrayDestroy(delWins);
2588 2589
        continue;
      }
2590 2591 2592
      removeResults(delWins, pUpdatedMap);
      taosArrayAddAll(pInfo->pDelWins, delWins);
      taosArrayDestroy(delWins);
2593
      break;
5
54liuyao 已提交
2594
    } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
5
54liuyao 已提交
2595
      getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
5
54liuyao 已提交
2596
      continue;
5
54liuyao 已提交
2597
    } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
2598
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap);
5
54liuyao 已提交
2599 2600 2601 2602
      if (taosArrayGetSize(pUpdated) > 0) {
        break;
      }
      continue;
L
Liu Jicong 已提交
2603
    } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
5
54liuyao 已提交
2604
      processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
5
54liuyao 已提交
2605
      continue;
5
54liuyao 已提交
2606
    }
5
54liuyao 已提交
2607

5
54liuyao 已提交
2608 2609 2610 2611
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
2612
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
H
Haojun Liao 已提交
2613
    doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap);
5
54liuyao 已提交
2614
    if (IS_FINAL_OP(pInfo)) {
S
shenglian zhou 已提交
2615
      int32_t chIndex = getChildIndex(pBlock);
5
54liuyao 已提交
2616 2617 2618 2619 2620
      int32_t size = taosArrayGetSize(pInfo->pChildren);
      // if chIndex + 1 - size > 0, add new child
      for (int32_t i = 0; i < chIndex + 1 - size; i++) {
        SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
        if (!pChildOp) {
S
Shengliang Guan 已提交
2621
          T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
2622
        }
2623
        SStreamIntervalOperatorInfo* pTmpInfo = pChildOp->info;
2624
        pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
5
54liuyao 已提交
2625
        taosArrayPush(pInfo->pChildren, &pChildOp);
5
54liuyao 已提交
2626
        qDebug("===stream===add child, id:%d", chIndex);
5
54liuyao 已提交
2627
      }
2628 2629
      SOperatorInfo*               pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
      SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
2630
      setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
H
Haojun Liao 已提交
2631
      doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.id.groupId, NULL);
5
54liuyao 已提交
2632
    }
5
54liuyao 已提交
2633 2634 2635
    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
    maxTs = TMAX(maxTs, pBlock->info.watermark);
    minTs = TMIN(minTs, pBlock->info.window.skey);
5
54liuyao 已提交
2636
  }
S
shenglian zhou 已提交
2637

2638
  removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
5
54liuyao 已提交
2639
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
2640
  pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
5
54liuyao 已提交
2641
  if (IS_FINAL_OP(pInfo)) {
2642
    closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
2643
                              pInfo->pPullDataMap, pUpdatedMap, pInfo->pDelWins, pOperator);
2644
    closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
5
54liuyao 已提交
2645
  }
2646
  pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
5
54liuyao 已提交
2647

5
54liuyao 已提交
2648 2649 2650 2651 2652 2653 2654
  void* pIte = NULL;
  while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
    taosArrayPush(pUpdated, pIte);
  }
  taosHashCleanup(pUpdatedMap);
  taosArraySort(pUpdated, resultrowComparAsc);

5
54liuyao 已提交
2655 2656
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
2657 2658 2659 2660

  doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
  if (pInfo->pPullDataRes->info.rows != 0) {
    // process the rest of the data
5
54liuyao 已提交
2661
    printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2662 2663 2664
    return pInfo->pPullDataRes;
  }

2665
  doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
5
54liuyao 已提交
2666 2667 2668 2669 2670 2671
  if (pInfo->pDelRes->info.rows != 0) {
    // process the rest of the data
    printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
    return pInfo->pDelRes;
  }

2672
  doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2673
  if (pInfo->binfo.pRes->info.rows != 0) {
5
54liuyao 已提交
2674
    printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2675 2676 2677 2678 2679 2680
    return pInfo->binfo.pRes;
  }

  return NULL;
}

5
54liuyao 已提交
2681 2682 2683 2684
int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
  if (pIntervalPhyNode->window.deleteMark <= 0) {
    return DEAULT_DELETE_MARK;
  }
L
Liu Jicong 已提交
2685
  int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark, pIntervalPhyNode->window.watermark);
5
54liuyao 已提交
2686 2687 2688 2689
  deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
  return deleteMark;
}

S
shenglian zhou 已提交
2690 2691
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                     SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
2692 2693 2694
  SIntervalPhysiNode*          pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
  SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5
54liuyao 已提交
2695 2696 2697
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2698

2699
  pOperator->pTaskInfo = pTaskInfo;
S
shenglian zhou 已提交
2700 2701 2702 2703 2704 2705 2706 2707
  pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
                                .sliding = pIntervalPhyNode->sliding,
                                .intervalUnit = pIntervalPhyNode->intervalUnit,
                                .slidingUnit = pIntervalPhyNode->slidingUnit,
                                .offset = pIntervalPhyNode->offset,
                                .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pIntervalPhyNode->window.watermark,
5
54liuyao 已提交
2708 2709
      .calTrigger = pIntervalPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
2710
      .minTs = INT64_MAX,
5
54liuyao 已提交
2711
      .deleteMark = getDeleteMark(pIntervalPhyNode),
L
Liu Jicong 已提交
2712 2713
      .deleteMarkSaved = 0,
      .calTriggerSaved = 0,
S
shenglian zhou 已提交
2714
  };
5
54liuyao 已提交
2715
  ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
5
54liuyao 已提交
2716 2717
  pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2718
  initResultSizeInfo(&pOperator->resultInfo, 4096);
5
54liuyao 已提交
2719 2720 2721 2722 2723 2724 2725 2726 2727
  if (pIntervalPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
    int32_t    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

S
shenglian zhou 已提交
2728 2729
  int32_t      numOfCols = 0;
  SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
2730
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
5
54liuyao 已提交
2731
  initBasicInfo(&pInfo->binfo, pResBlock);
2732

L
Liu Jicong 已提交
2733 2734
  int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
                            pTaskInfo->streamInfo.pState);
2735 2736 2737 2738
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2739
  initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
2740

5
54liuyao 已提交
2741
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2742

2743 2744 2745 2746
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
  streamStateSetNumber(pInfo->pState, -1);

2747
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
5
54liuyao 已提交
2748 2749
  pInfo->pChildren = NULL;
  if (numOfChild > 0) {
2750
    pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
5
54liuyao 已提交
2751 2752 2753
    if (!pInfo->pChildren) {
      goto _error;
    }
5
54liuyao 已提交
2754 2755 2756
    for (int32_t i = 0; i < numOfChild; i++) {
      SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
      if (pChildOp) {
2757
        SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
2758
        pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
5
54liuyao 已提交
2759
        taosArrayPush(pInfo->pChildren, &pChildOp);
2760
        streamStateSetNumber(pChInfo->pState, i);
5
54liuyao 已提交
2761 2762 2763 2764 2765
        continue;
      }
      goto _error;
    }
  }
5
54liuyao 已提交
2766

2767
  pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
5
54liuyao 已提交
2768

5
54liuyao 已提交
2769 2770 2771 2772
  if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
    pInfo->isFinal = true;
    pOperator->name = "StreamFinalIntervalOperator";
  } else {
5
54liuyao 已提交
2773
    // semi interval operator does not catch result
5
54liuyao 已提交
2774 2775 2776 2777
    pInfo->isFinal = false;
    pOperator->name = "StreamSemiIntervalOperator";
  }

5
54liuyao 已提交
2778
  if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
5
54liuyao 已提交
2779 2780
    pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
  }
5
54liuyao 已提交
2781 2782 2783 2784
  pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
  pInfo->pullIndex = 0;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
2785
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
5
54liuyao 已提交
2786
  pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
2787
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
2788
  pInfo->delIndex = 0;
H
Haojun Liao 已提交
2789
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
2790 2791
  pInfo->delKey.ts = INT64_MAX;
  pInfo->delKey.groupId = 0;
5
54liuyao 已提交
2792
  pInfo->numOfDatapack = 0;
5
54liuyao 已提交
2793

5
54liuyao 已提交
2794
  pOperator->operatorType = pPhyNode->type;
5
54liuyao 已提交
2795 2796 2797 2798
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;

L
Liu Jicong 已提交
2799 2800
  pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
                                         optrDefaultBufFn, NULL);
2801
  if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
5
54liuyao 已提交
2802
    initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
2803
  }
5
54liuyao 已提交
2804 2805 2806 2807 2808 2809 2810 2811
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
2812
  destroyStreamFinalIntervalOperatorInfo(pInfo);
5
54liuyao 已提交
2813 2814 2815
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
5
54liuyao 已提交
2816
}
5
54liuyao 已提交
2817 2818

void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
5
54liuyao 已提交
2819
  tSimpleHashCleanup(pSup->pResultRows);
5
54liuyao 已提交
2820 2821
  destroyDiskbasedBuf(pSup->pResultBuf);
  blockDataDestroy(pSup->pScanBlock);
5
54liuyao 已提交
2822 2823
  taosMemoryFreeClear(pSup->pState);
  taosMemoryFreeClear(pSup->pDummyCtx);
5
54liuyao 已提交
2824 2825
}

2826
void destroyStreamSessionAggOperatorInfo(void* param) {
5
54liuyao 已提交
2827
  SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
2828
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
2829
  destroyStreamAggSupporter(&pInfo->streamAggSup);
5
54liuyao 已提交
2830

2831 2832 2833
  if (pInfo->pChildren != NULL) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
2834 2835
      SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
      destroyOperatorInfo(pChild);
2836
    }
5
54liuyao 已提交
2837
    taosArrayDestroy(pInfo->pChildren);
2838
  }
5
54liuyao 已提交
2839 2840 2841 2842
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  blockDataDestroy(pInfo->pDelRes);
  blockDataDestroy(pInfo->pWinBlock);
  blockDataDestroy(pInfo->pUpdateRes);
5
54liuyao 已提交
2843
  tSimpleHashCleanup(pInfo->pStDeleted);
2844

D
dapan1121 已提交
2845
  taosMemoryFreeClear(param);
5
54liuyao 已提交
2846 2847
}

2848 2849
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
                        SSDataBlock* pResultBlock) {
H
Haojun Liao 已提交
2850
  initBasicInfo(pBasicInfo, pResultBlock);
2851 2852 2853 2854
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
2855

H
Haojun Liao 已提交
2856
  initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
5
54liuyao 已提交
2857
  for (int32_t i = 0; i < numOfCols; ++i) {
2858
    pSup->pCtx[i].saveHandle.pBuf = NULL;
5
54liuyao 已提交
2859
  }
2860

2861
  ASSERT(numOfCols > 0);
5
54liuyao 已提交
2862 2863 2864 2865 2866 2867 2868 2869
  return TSDB_CODE_SUCCESS;
}

void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t nums) {
  for (int i = 0; i < nums; i++) {
    pDummy[i].functionId = pCtx[i].functionId;
  }
}
5
54liuyao 已提交
2870

5
54liuyao 已提交
2871 2872
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
                    STimeWindowAggSupp* pTwSup) {
2873 2874 2875 2876 2877 2878
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
    SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
    pScanInfo->tsColIndex = tsColIndex;
  }

  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
5
54liuyao 已提交
2879
    initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup);
2880 2881
    return;
  }
2882
  SStreamScanInfo* pScanInfo = downstream->info;
5
54liuyao 已提交
2883
  pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
5
54liuyao 已提交
2884
  if (!pScanInfo->pUpdateInfo) {
5
54liuyao 已提交
2885
    pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
5
54liuyao 已提交
2886
  }
5
54liuyao 已提交
2887
  pScanInfo->twAggSup = *pTwSup;
5
54liuyao 已提交
2888 2889
}

5
54liuyao 已提交
2890 2891 2892 2893 2894 2895 2896 2897 2898 2899
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
                               SStreamState* pState, int32_t keySize, int16_t keyType) {
  pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
  pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
  pSup->gap = gap;
  pSup->stateKeySize = keySize;
  pSup->stateKeyType = keyType;
  pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
  if (pSup->pDummyCtx == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
2900
  }
H
Haojun Liao 已提交
2901

5
54liuyao 已提交
2902 2903 2904 2905
  initDummyFunction(pSup->pDummyCtx, pCtx, numOfOutput);
  pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pSup->pState) = *pState;
  streamStateSetNumber(pSup->pState, -1);
2906

5
54liuyao 已提交
2907 2908
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = tSimpleHashInit(32, hashFn);
X
Xiaoyu Wang 已提交
2909

5
54liuyao 已提交
2910 2911 2912
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
5
54liuyao 已提交
2913
  }
5
54liuyao 已提交
2914 2915 2916 2917
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
5
54liuyao 已提交
2918
  }
5
54liuyao 已提交
2919 2920 2921 2922
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s", terrstr(terrno));
    return terrno;
5
54liuyao 已提交
2923
  }
5
54liuyao 已提交
2924 2925 2926
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
  for (int32_t i = 0; i < numOfOutput; ++i) {
    pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
5
54liuyao 已提交
2927 2928
  }

5
54liuyao 已提交
2929
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
2930
}
5
54liuyao 已提交
2931 2932

bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
5
54liuyao 已提交
2933
  if (ts + gap >= pWin->skey && ts - gap <= pWin->ekey) {
5
54liuyao 已提交
2934 2935 2936 2937 2938
    return true;
  }
  return false;
}

5
54liuyao 已提交
2939 2940
bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) {
  return isInTimeWindow(&pWinInfo->sessionWin.win, ts, gap);
5
54liuyao 已提交
2941 2942
}

5
54liuyao 已提交
2943 2944 2945 2946 2947
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
                         SSessionKey* pKey) {
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
2948
  int32_t code = streamStateSessionGetKeyByRange(pAggSup->pState, pKey, pKey);
5
54liuyao 已提交
2949 2950
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
2951 2952 2953
  }
}

5
54liuyao 已提交
2954
bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->sessionWin.win.skey == 0; }
5
54liuyao 已提交
2955

5
54liuyao 已提交
2956 2957 2958
void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
                         SResultWindowInfo* pCurWin) {
  pCurWin->sessionWin.groupId = groupId;
2959 2960
  pCurWin->sessionWin.win.skey = startTs;
  pCurWin->sessionWin.win.ekey = endTs;
5
54liuyao 已提交
2961
  int32_t size = pAggSup->resultRowSize;
2962 2963
  int32_t code =
      streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, pAggSup->gap, &pCurWin->pOutputBuf, &size);
5
54liuyao 已提交
2964 2965 2966 2967 2968
  if (code == TSDB_CODE_SUCCESS) {
    pCurWin->isOutput = true;
  } else {
    pCurWin->sessionWin.win.skey = startTs;
    pCurWin->sessionWin.win.ekey = endTs;
5
54liuyao 已提交
2969
  }
5
54liuyao 已提交
2970
}
5
54liuyao 已提交
2971

5
54liuyao 已提交
2972 2973
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
  int32_t size = 0;
2974
  int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
5
54liuyao 已提交
2975 2976
  if (code != TSDB_CODE_SUCCESS) {
    return code;
5
54liuyao 已提交
2977
  }
5
54liuyao 已提交
2978 2979 2980 2981 2982 2983
  streamStateCurNext(pAggSup->pState, pCur);
  return TSDB_CODE_SUCCESS;
}
void saveDeleteInfo(SArray* pWins, SSessionKey key) {
  // key.win.ekey = key.win.skey;
  taosArrayPush(pWins, &key);
5
54liuyao 已提交
2984 2985
}

5
54liuyao 已提交
2986 2987 2988 2989
void saveDeleteRes(SSHashObj* pStDelete, SSessionKey key) {
  key.win.ekey = key.win.skey;
  tSimpleHashPut(pStDelete, &key, sizeof(SSessionKey), NULL, 0);
}
2990

5
54liuyao 已提交
2991 2992 2993 2994 2995
static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) {
  key.win.ekey = key.win.skey;
  tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
  tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
}
5
54liuyao 已提交
2996

5
54liuyao 已提交
2997 2998 2999 3000 3001
static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
  *pHashKey = *pKey;
  pHashKey->win.ekey = pKey->win.skey;
}

5
54liuyao 已提交
3002 3003 3004
static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
  if (tSimpleHashGetSize(pHashMap) == 0) {
    return;
5
54liuyao 已提交
3005
  }
5
54liuyao 已提交
3006 3007 3008 3009
  int32_t size = taosArrayGetSize(pWins);
  for (int32_t i = 0; i < size; i++) {
    SSessionKey* pWin = taosArrayGet(pWins, i);
    if (!pWin) continue;
5
54liuyao 已提交
3010 3011
    SSessionKey key = {0};
    getSessionHashKey(pWin, &key);
5
54liuyao 已提交
3012
    tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
5
54liuyao 已提交
3013 3014 3015
  }
}

dengyihao's avatar
dengyihao 已提交
3016
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,
5
54liuyao 已提交
3017 3018
                                int32_t rows, int32_t start, int64_t gap, SSHashObj* pResultRows, SSHashObj* pStUpdated,
                                SSHashObj* pStDeleted) {
5
54liuyao 已提交
3019
  for (int32_t i = start; i < rows; ++i) {
3020
    if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
5
54liuyao 已提交
3021 3022
      return i - start;
    }
5
54liuyao 已提交
3023
    if (pWinInfo->sessionWin.win.skey > pStartTs[i]) {
5
54liuyao 已提交
3024
      if (pStDeleted && pWinInfo->isOutput) {
5
54liuyao 已提交
3025
        saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
5
54liuyao 已提交
3026
      }
5
54liuyao 已提交
3027 3028
      removeSessionResult(pStUpdated, pResultRows, pWinInfo->sessionWin);
      pWinInfo->sessionWin.win.skey = pStartTs[i];
5
54liuyao 已提交
3029
    }
5
54liuyao 已提交
3030
    pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]);
5
54liuyao 已提交
3031
    if (pEndTs) {
5
54liuyao 已提交
3032
      pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pEndTs[i]);
5
54liuyao 已提交
3033 3034 3035 3036 3037
    }
  }
  return rows - start;
}

5
54liuyao 已提交
3038 3039
static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
                                    int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
3040
  ASSERT(pWinInfo->sessionWin.win.skey <= pWinInfo->sessionWin.win.ekey);
5
54liuyao 已提交
3041
  *pResult = (SResultRow*)pWinInfo->pOutputBuf;
5
54liuyao 已提交
3042
  // set time window for current result
5
54liuyao 已提交
3043
  (*pResult)->win = pWinInfo->sessionWin.win;
3044
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
5
54liuyao 已提交
3045 3046 3047
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3048 3049 3050
static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
                                  int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
                                  SOperatorInfo* pOperator) {
3051
  SExprSupp*     pSup = &pOperator->exprSupp;
3052
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3053
  int32_t        code = initSessionOutputBuf(pCurWin, pResult, pSup->pCtx, numOutput, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
3054
  if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
S
Shengliang Guan 已提交
3055
    return TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
3056
  }
5
54liuyao 已提交
3057
  updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, false);
H
Haojun Liao 已提交
3058
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput);
5
54liuyao 已提交
3059 3060 3061
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3062 3063
static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
  streamStateSessionDel(pAggSup->pState, pKey);
5
54liuyao 已提交
3064 3065 3066
  SSessionKey hashKey = {0};
  getSessionHashKey(pKey, &hashKey);
  tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
5
54liuyao 已提交
3067 3068 3069 3070 3071 3072 3073 3074 3075 3076
  return true;
}

static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo) {
  void* pVal = tSimpleHashGet(pStUpdated, &pWinInfo->sessionWin, sizeof(SSessionKey));
  if (pVal) {
    SResultWindowInfo* pWin = pVal;
    pWinInfo->isOutput = pWin->isOutput;
  }
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3077 3078
}

5
54liuyao 已提交
3079 3080 3081 3082 3083 3084 3085
SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
                                       SResultWindowInfo* pNextWin) {
  SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->sessionWin);
  pNextWin->isOutput = true;
  setSessionWinOutputInfo(pStUpdated, pNextWin);
  int32_t size = 0;
  pNextWin->sessionWin = pCurWin->sessionWin;
3086
  int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
5
54liuyao 已提交
3087 3088 3089 3090
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_INVALID(*pNextWin);
  }
  return pCur;
5
54liuyao 已提交
3091 3092
}

5
54liuyao 已提交
3093 3094 3095 3096 3097 3098 3099 3100 3101
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
                                 SSHashObj* pStDeleted) {
  SExprSupp*                     pSup = &pOperator->exprSupp;
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SResultRow*                    pCurResult = NULL;
  int32_t                        numOfOutput = pOperator->exprSupp.numOfExprs;
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
  initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
3102
  // Just look for the window behind StartIndex
5
54liuyao 已提交
3103 3104 3105 3106 3107 3108
  while (1) {
    SResultWindowInfo winInfo = {0};
    SStreamStateCur*  pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo);
    if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap)) {
      streamStateFreeCur(pCur);
      break;
5
54liuyao 已提交
3109
    }
5
54liuyao 已提交
3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121
    SResultRow* pWinResult = NULL;
    initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
    pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true);
    compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
    tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey));
    if (winInfo.isOutput && pStDeleted) {
      saveDeleteRes(pStDeleted, winInfo.sessionWin);
    }
    removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin);
    doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
    streamStateFreeCur(pCur);
5
54liuyao 已提交
3122 3123 3124
  }
}

5
54liuyao 已提交
3125 3126 3127
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
  saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize);
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3128 3129
}

5
54liuyao 已提交
3130 3131
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
                                   SSHashObj* pStDeleted, bool hasEndTs) {
X
Xiaoyu Wang 已提交
3132
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3133
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
3134
  int32_t                        numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
3135
  uint64_t                       groupId = pSDataBlock->info.id.groupId;
X
Xiaoyu Wang 已提交
3136
  int64_t                        code = TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3137 3138 3139
  SResultRow*                    pResult = NULL;
  int32_t                        rows = pSDataBlock->info.rows;
  int32_t                        winRows = 0;
X
Xiaoyu Wang 已提交
3140

5
54liuyao 已提交
3141
  SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
3142
  TSKEY*           startTsCols = (int64_t*)pStartTsCol->pData;
5
54liuyao 已提交
3143 3144 3145
  SColumnInfoData* pEndTsCol = NULL;
  if (hasEndTs) {
    pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->endTsIndex);
5
54liuyao 已提交
3146
  } else {
5
54liuyao 已提交
3147
    pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
3148
  }
X
Xiaoyu Wang 已提交
3149

5
54liuyao 已提交
3150
  TSKEY*               endTsCols = (int64_t*)pEndTsCol->pData;
5
54liuyao 已提交
3151
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3152
  for (int32_t i = 0; i < rows;) {
5
54liuyao 已提交
3153
    if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
5
54liuyao 已提交
3154 3155 3156
      i++;
      continue;
    }
5
54liuyao 已提交
3157 3158 3159 3160 3161
    SResultWindowInfo winInfo = {0};
    setSessionOutputBuf(pAggSup, startTsCols[i], endTsCols[i], groupId, &winInfo);
    setSessionWinOutputInfo(pStUpdated, &winInfo);
    winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
                                      pAggSup->pResultRows, pStUpdated, pStDeleted);
5
54liuyao 已提交
3162 3163
    // coverity scan error
    if (!winInfo.pOutputBuf) {
S
Shengliang Guan 已提交
3164
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3165
    }
L
Liu Jicong 已提交
3166

5
54liuyao 已提交
3167 3168
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
                              pOperator);
5
54liuyao 已提交
3169
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
3170
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3171
    }
5
54liuyao 已提交
3172 3173
    compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted);
    saveSessionOutputBuf(pAggSup, &winInfo);
5
54liuyao 已提交
3174 3175

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
5
54liuyao 已提交
3176
      code = saveResult(winInfo, pStUpdated);
5
54liuyao 已提交
3177
      if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
3178
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3179
      }
5
54liuyao 已提交
3180
    }
5
54liuyao 已提交
3181
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
3182 3183
      SSessionKey key = {0};
      getSessionHashKey(&winInfo.sessionWin, &key);
5
54liuyao 已提交
3184 3185 3186
      tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
    }

5
54liuyao 已提交
3187 3188 3189 3190
    i += winRows;
  }
}

5
54liuyao 已提交
3191
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
5
54liuyao 已提交
3192
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
3193
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
5
54liuyao 已提交
3194
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
3195
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
3196
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
3197
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
5
54liuyao 已提交
3198
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
5
54liuyao 已提交
3199 3200 3201 3202
    while (1) {
      SSessionKey curWin = {0};
      getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], &curWin);
      if (IS_INVALID_SESSION_WIN_KEY(curWin)) {
3203 3204
        break;
      }
5
54liuyao 已提交
3205 3206 3207 3208
      doDeleteSessionWindow(pAggSup, &curWin);
      if (result) {
        saveDeleteInfo(result, curWin);
      }
3209
    }
5
54liuyao 已提交
3210 3211 3212
  }
}

5
54liuyao 已提交
3213 3214 3215 3216 3217 3218 3219 3220
static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
  SSessionKey* pWin1 = (SSessionKey*)pKey1;
  SSessionKey* pWin2 = (SSessionKey*)pKey2;

  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
5
54liuyao 已提交
3221 3222
  }

5
54liuyao 已提交
3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243
  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
  void*   pIte = NULL;
  size_t  keyLen = 0;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
    void* key = tSimpleHashGetKey(pIte, &keyLen);
    taosArrayPush(pUpdated, key);
  }
  taosArraySort(pUpdated, sessionKeyCompareAsc);
  return TSDB_CODE_SUCCESS;
}

3244
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
5
54liuyao 已提交
3245 3246 3247 3248
  blockDataCleanup(pBlock);
  int32_t size = tSimpleHashGetSize(pStDeleted);
  if (size == 0) {
    return;
3249 3250
  }
  blockDataEnsureCapacity(pBlock, size);
5
54liuyao 已提交
3251 3252 3253 3254 3255 3256 3257
  size_t  keyLen = 0;
  int32_t iter = 0;
  while (((*Ite) = tSimpleHashIterate(pStDeleted, *Ite, &iter)) != NULL) {
    if (pBlock->info.rows + 1 > pBlock->info.capacity) {
      break;
    }
    SSessionKey*     res = tSimpleHashGetKey(*Ite, &keyLen);
3258
    SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
5
54liuyao 已提交
3259
    colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
3260
    SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
3261
    colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
3262 3263
    SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
    colDataAppendNULL(pUidCol, pBlock->info.rows);
5
54liuyao 已提交
3264 3265
    SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
    colDataAppend(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false);
3266 3267 3268 3269
    SColumnInfoData* pCalStCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
    colDataAppendNULL(pCalStCol, pBlock->info.rows);
    SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
    colDataAppendNULL(pCalEdCol, pBlock->info.rows);
3270 3271

    SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
3272 3273 3274

    void* tbname = NULL;
    streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname);
3275 3276 3277 3278 3279 3280
    if (tbname == NULL) {
      colDataAppendNULL(pTableCol, pBlock->info.rows);
    } else {
      char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
      STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
      colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
L
Liu Jicong 已提交
3281
      tdbFree(tbname);
3282
    }
5
54liuyao 已提交
3283 3284 3285
    pBlock->info.rows += 1;
  }
  if ((*Ite) == NULL) {
5
54liuyao 已提交
3286
    tSimpleHashClear(pStDeleted);
5
54liuyao 已提交
3287 3288 3289
  }
}

5
54liuyao 已提交
3290 3291 3292 3293 3294 3295 3296 3297
static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) {
  SExprSupp*                     pSup = &pOperator->exprSupp;
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
  int32_t                        size = taosArrayGetSize(pWinArray);
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
  int32_t                        numOfOutput = pSup->numOfExprs;
  int32_t                        numOfChildren = taosArrayGetSize(pInfo->pChildren);
3298

3299
  for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3300 3301 3302
    SSessionKey*      pWinKey = taosArrayGet(pWinArray, i);
    int32_t           num = 0;
    SResultWindowInfo parentWin = {0};
3303
    for (int32_t j = 0; j < numOfChildren; j++) {
X
Xiaoyu Wang 已提交
3304
      SOperatorInfo*                 pChild = taosArrayGetP(pInfo->pChildren, j);
3305
      SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
5
54liuyao 已提交
3306
      SStreamAggSupporter*           pChAggSup = &pChInfo->streamAggSup;
5
54liuyao 已提交
3307 3308
      SSessionKey                    chWinKey = {0};
      getSessionHashKey(pWinKey, &chWinKey);
3309 3310 3311
      SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
      SResultRow*      pResult = NULL;
      SResultRow*      pChResult = NULL;
5
54liuyao 已提交
3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323
      while (1) {
        SResultWindowInfo childWin = {0};
        childWin.sessionWin = *pWinKey;
        int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
        if (code == TSDB_CODE_SUCCESS && pWinKey->win.skey <= childWin.sessionWin.win.skey &&
            childWin.sessionWin.win.ekey <= pWinKey->win.ekey) {
          if (num == 0) {
            setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
            code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
            if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
              break;
            }
3324
          }
5
54liuyao 已提交
3325 3326 3327 3328 3329 3330 3331 3332
          num++;
          updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, true);
          initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
                               pChild->exprSupp.rowEntryInfoOffset);
          compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
          compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL);
          saveResult(parentWin, pStUpdated);
        } else {
5
54liuyao 已提交
3333
          break;
3334 3335
        }
      }
5
54liuyao 已提交
3336 3337 3338 3339
      streamStateFreeCur(pCur);
    }
    if (num > 0) {
      saveSessionOutputBuf(pAggSup, &parentWin);
3340 3341 3342 3343
    }
  }
}

5
54liuyao 已提交
3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) {
  void*   pIte = NULL;
  size_t  keyLen = 0;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
    SResultWindowInfo* pWinInfo = pIte;
    if (isCloseWindow(&pWinInfo->sessionWin.win, pTwSup)) {
      if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) {
        int32_t code = saveResult(*pWinInfo, pClosed);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
5
54liuyao 已提交
3355 3356
        }
      }
3357 3358
      SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
      tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
5
54liuyao 已提交
3359 3360 3361 3362 3363
    }
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3364
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs) {
5
54liuyao 已提交
3365 3366 3367 3368 3369
  int32_t size = taosArrayGetSize(pChildren);
  for (int32_t i = 0; i < size; i++) {
    SOperatorInfo*                 pChildOp = taosArrayGetP(pChildren, i);
    SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
    pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
3370
    closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL);
5
54liuyao 已提交
3371 3372 3373
  }
}

5
54liuyao 已提交
3374 3375 3376 3377
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
3378
    SResultWindowInfo* pWinInfo = pIte;
5
54liuyao 已提交
3379
    saveResult(*pWinInfo, pStUpdated);
5
54liuyao 已提交
3380 3381 3382 3383
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3384
static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
5
54liuyao 已提交
3385 3386
  int32_t size = taosArrayGetSize(pResWins);
  for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3387 3388
    SSessionKey* pWinKey = taosArrayGet(pResWins, i);
    if (!pWinKey) continue;
5
54liuyao 已提交
3389 3390
    SSessionKey winInfo = {0};
    getSessionHashKey(pWinKey, &winInfo);
5
54liuyao 已提交
3391
    tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0);
3392 3393 3394
  }
}

5
54liuyao 已提交
3395 3396 3397
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
  pGroupResInfo->pRows = pArrayList;
  pGroupResInfo->index = 0;
3398 3399
}

5
54liuyao 已提交
3400 3401 3402 3403 3404 3405 3406 3407 3408 3409
void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroupResInfo* pGroupResInfo,
                          SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    taosArrayDestroy(pGroupResInfo->pRows);
    pGroupResInfo->pRows = NULL;
3410 3411 3412
    return;
  }

5
54liuyao 已提交
3413
  // clear the existed group id
H
Haojun Liao 已提交
3414
  pBlock->info.id.groupId = 0;
3415
  buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
5
54liuyao 已提交
3416 3417
}

5
54liuyao 已提交
3418
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
5
54liuyao 已提交
3419
  SExprSupp*                     pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3420
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
3421
  SOptrBasicInfo*                pBInfo = &pInfo->binfo;
5
54liuyao 已提交
3422
  TSKEY                          maxTs = INT64_MIN;
5
54liuyao 已提交
3423
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3424 3425 3426
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  } else if (pOperator->status == OP_RES_TO_RETURN) {
3427
    doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3428
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3429
      printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
5
54liuyao 已提交
3430 3431
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3432 3433 3434 3435
    doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
    if (pBInfo->pRes->info.rows > 0) {
      printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
      return pBInfo->pRes;
5
54liuyao 已提交
3436
    }
5
54liuyao 已提交
3437

H
Haojun Liao 已提交
3438
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
3439
    return NULL;
5
54liuyao 已提交
3440 3441
  }

X
Xiaoyu Wang 已提交
3442
  _hash_fn_t     hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3443
  SSHashObj*     pStUpdated = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
3444
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3445
  SArray*        pUpdated = taosArrayInit(16, sizeof(SSessionKey));  // SResKeyPos
5
54liuyao 已提交
3446 3447 3448 3449 3450
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      break;
    }
5
54liuyao 已提交
3451
    printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
3452

5
54liuyao 已提交
3453 3454 3455
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3456
      // gap must be 0
5
54liuyao 已提交
3457 3458
      doDeleteTimeWindows(pAggSup, pBlock, pWins);
      removeSessionResults(pStUpdated, pWins);
5
54liuyao 已提交
3459 3460 3461 3462 3463
      if (IS_FINAL_OP(pInfo)) {
        int32_t                        childIndex = getChildIndex(pBlock);
        SOperatorInfo*                 pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
        SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
        // gap must be 0
5
54liuyao 已提交
3464 3465
        doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, NULL);
        rebuildSessionWindow(pOperator, pWins, pStUpdated);
5
54liuyao 已提交
3466 3467 3468 3469
      }
      copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
      taosArrayDestroy(pWins);
      continue;
3470
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3471
      getAllSessionWindow(pAggSup->pResultRows, pStUpdated);
5
54liuyao 已提交
3472
      continue;
5
54liuyao 已提交
3473
    }
5
54liuyao 已提交
3474

5
54liuyao 已提交
3475 3476 3477 3478
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
3479
    // the pDataBlock are always the same one, no need to call this again
3480
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3481 3482 3483 3484 3485 3486
    doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo));
    if (IS_FINAL_OP(pInfo)) {
      int32_t chIndex = getChildIndex(pBlock);
      int32_t size = taosArrayGetSize(pInfo->pChildren);
      // if chIndex + 1 - size > 0, add new child
      for (int32_t i = 0; i < chIndex + 1 - size; i++) {
3487 3488
        SOperatorInfo* pChildOp =
            createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
5
54liuyao 已提交
3489
        if (!pChildOp) {
S
Shengliang Guan 已提交
3490
          T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3491 3492 3493
        }
        taosArrayPush(pInfo->pChildren, &pChildOp);
      }
3494
      SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
3495
      setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3496
      doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
3497
    }
5
54liuyao 已提交
3498
    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
3499
    maxTs = TMAX(maxTs, pBlock->info.watermark);
5
54liuyao 已提交
3500
  }
5
54liuyao 已提交
3501 3502

  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
3503 3504
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
3505

5
54liuyao 已提交
3506 3507
  closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pStUpdated);
  closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
5
54liuyao 已提交
3508
  copyUpdateResult(pStUpdated, pUpdated);
5
54liuyao 已提交
3509 3510 3511
  removeSessionResults(pInfo->pStDeleted, pUpdated);
  tSimpleHashCleanup(pStUpdated);
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
5
54liuyao 已提交
3512
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
3513

3514 3515 3516 3517 3518 3519
#if 0
  char* pBuf = streamStateSessionDump(pAggSup->pState);
  qDebug("===stream===final session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

3520
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3521
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3522
    printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
5
54liuyao 已提交
3523 3524
    return pInfo->pDelRes;
  }
5
54liuyao 已提交
3525 3526 3527 3528 3529 3530 3531

  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
  if (pBInfo->pRes->info.rows > 0) {
    printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
    return pBInfo->pRes;
  }

H
Haojun Liao 已提交
3532
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
3533
  return NULL;
5
54liuyao 已提交
3534 3535
}

5
54liuyao 已提交
3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                  SExecTaskInfo* pTaskInfo) {
  SSessionWinodwPhysiNode*       pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
  int32_t                        numOfCols = 0;
  int32_t                        code = TSDB_CODE_OUT_OF_MEMORY;
  SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

  pOperator->pTaskInfo = pTaskInfo;

  initResultSizeInfo(&pOperator->resultInfo, 4096);
  if (pSessionNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
5
54liuyao 已提交
3556 3557
    }
  }
5
54liuyao 已提交
3558 3559 3560
  SExprSupp* pSup = &pOperator->exprSupp;

  SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
3561
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
5
54liuyao 已提交
3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596
  code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
                                pTaskInfo->streamInfo.pState, 0, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pSessionNode->window.watermark,
      .calTrigger = pSessionNode->window.triggerType,
      .maxTs = INT64_MIN,
      .minTs = INT64_MAX,
  };

  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

  pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
  if (pSessionNode->window.pTsEnd) {
    pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
  }
  pInfo->binfo.pRes = pResBlock;
  pInfo->order = TSDB_ORDER_ASC;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
  pInfo->pDelIterator = NULL;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
  pInfo->pChildren = NULL;
  pInfo->isFinal = false;
  pInfo->pPhyNode = pPhyNode;
  pInfo->ignoreExpiredData = pSessionNode->window.igExpired;

H
Haojun Liao 已提交
3597 3598
  setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
                  OP_NOT_OPENED, pInfo, pTaskInfo);
L
Liu Jicong 已提交
3599 3600
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
                                         optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
3601

5
54liuyao 已提交
3602
  if (downstream) {
5
54liuyao 已提交
3603
    initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
5
54liuyao 已提交
3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620
    code = appendDownstream(pOperator, &downstream, 1);
  }
  return pOperator;

_error:
  if (pInfo != NULL) {
    destroyStreamSessionAggOperatorInfo(pInfo);
  }

  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
  tSimpleHashClear(pInfo->streamAggSup.pResultRows);
  streamStateSessionClear(pInfo->streamAggSup.pState);
5
54liuyao 已提交
3621 3622 3623 3624 3625 3626 3627
}

static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SOptrBasicInfo*                pBInfo = &pInfo->binfo;
  TSKEY                          maxTs = INT64_MIN;
  SExprSupp*                     pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3628
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
3629

5
54liuyao 已提交
3630 3631
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
3632
  }
L
Liu Jicong 已提交
3633

3634
  {
5
54liuyao 已提交
3635
    doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
5
54liuyao 已提交
3636
    if (pBInfo->pRes->info.rows > 0) {
H
Haojun Liao 已提交
3637
      printDataBlock(pBInfo->pRes, "semi session");
5
54liuyao 已提交
3638 3639 3640
      return pBInfo->pRes;
    }

3641
    doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
3642
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3643
      printDataBlock(pInfo->pDelRes, "semi session delete");
5
54liuyao 已提交
3644 3645
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3646

3647
    if (pOperator->status == OP_RES_TO_RETURN) {
5
54liuyao 已提交
3648
      clearFunctionContext(&pOperator->exprSupp);
3649 3650
      // semi interval operator clear disk buffer
      clearStreamSessionOperator(pInfo);
H
Haojun Liao 已提交
3651
      setOperatorCompleted(pOperator);
3652 3653
      return NULL;
    }
5
54liuyao 已提交
3654 3655 3656
  }

  _hash_fn_t     hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3657
  SSHashObj*     pStUpdated = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
3658
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3659
  SArray*        pUpdated = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3660 3661 3662
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
3663
      clearSpecialDataBlock(pInfo->pUpdateRes);
3664
      pOperator->status = OP_RES_TO_RETURN;
5
54liuyao 已提交
3665 3666
      break;
    }
H
Haojun Liao 已提交
3667
    printDataBlock(pBlock, "semi session recv");
5
54liuyao 已提交
3668

5
54liuyao 已提交
3669 3670
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
5
54liuyao 已提交
3671
      // gap must be 0
3672
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3673
      doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
3674
      removeSessionResults(pStUpdated, pWins);
5
54liuyao 已提交
3675
      copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
3676
      taosArrayDestroy(pWins);
5
54liuyao 已提交
3677
      break;
5
54liuyao 已提交
3678
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3679
      getAllSessionWindow(pInfo->streamAggSup.pResultRows, pStUpdated);
5
54liuyao 已提交
3680 3681 3682
      continue;
    }

5
54liuyao 已提交
3683 3684 3685 3686
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
5
54liuyao 已提交
3687
    // the pDataBlock are always the same one, no need to call this again
3688
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
3689
    doStreamSessionAggImpl(pOperator, pBlock, pStUpdated, NULL, false);
5
54liuyao 已提交
3690 3691 3692 3693
    maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
  }

  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
3694
  pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
3695

5
54liuyao 已提交
3696
  copyUpdateResult(pStUpdated, pUpdated);
5
54liuyao 已提交
3697 3698
  removeSessionResults(pInfo->pStDeleted, pUpdated);
  tSimpleHashCleanup(pStUpdated);
5
54liuyao 已提交
3699

5
54liuyao 已提交
3700
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
5
54liuyao 已提交
3701
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
3702

3703 3704 3705 3706 3707 3708
#if 0
  char* pBuf = streamStateSessionDump(pAggSup->pState);
  qDebug("===stream===semi session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

5
54liuyao 已提交
3709
  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
5
54liuyao 已提交
3710
  if (pBInfo->pRes->info.rows > 0) {
H
Haojun Liao 已提交
3711
    printDataBlock(pBInfo->pRes, "semi session");
5
54liuyao 已提交
3712 3713 3714
    return pBInfo->pRes;
  }

3715
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
3716
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3717
    printDataBlock(pInfo->pDelRes, "semi session delete");
5
54liuyao 已提交
3718 3719
    return pInfo->pDelRes;
  }
5
54liuyao 已提交
3720

5
54liuyao 已提交
3721 3722 3723
  clearFunctionContext(&pOperator->exprSupp);
  // semi interval operator clear disk buffer
  clearStreamSessionOperator(pInfo);
H
Haojun Liao 已提交
3724
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
3725
  return NULL;
5
54liuyao 已提交
3726
}
3727

3728 3729
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                       SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
3730 3731
  int32_t        code = TSDB_CODE_OUT_OF_MEMORY;
  SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo);
3732 3733 3734
  if (pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3735

3736
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
5
54liuyao 已提交
3737

H
Haojun Liao 已提交
3738
  pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
L
Liu Jicong 已提交
3739
  char* name = (pInfo->isFinal) ? "StreamSessionFinalAggOperator" : "StreamSessionSemiAggOperator";
H
Haojun Liao 已提交
3740 3741

  if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
H
Haojun Liao 已提交
3742
    pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
5
54liuyao 已提交
3743
    blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
3744 3745
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
                                           destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
5
54liuyao 已提交
3746
  }
3747

L
Liu Jicong 已提交
3748
  setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
H
Haojun Liao 已提交
3749

5
54liuyao 已提交
3750 3751 3752 3753
  pOperator->operatorType = pPhyNode->type;
  if (numOfChild > 0) {
    pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
    for (int32_t i = 0; i < numOfChild; i++) {
5
54liuyao 已提交
3754 3755
      SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
      if (pChildOp == NULL) {
5
54liuyao 已提交
3756 3757
        goto _error;
      }
5
54liuyao 已提交
3758 3759 3760 3761
      SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
      pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      streamStateSetNumber(pChInfo->streamAggSup.pState, i);
      taosArrayPush(pInfo->pChildren, &pChildOp);
3762 3763
    }
  }
3764 3765 3766 3767 3768

  if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
    pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
  }

3769 3770 3771 3772
  return pOperator;

_error:
  if (pInfo != NULL) {
3773
    destroyStreamSessionAggOperatorInfo(pInfo);
3774 3775 3776 3777 3778
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
5
54liuyao 已提交
3779

3780
void destroyStreamStateOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3781
  SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param;
3782
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
3783
  destroyStreamAggSupporter(&pInfo->streamAggSup);
5
54liuyao 已提交
3784 3785 3786 3787
  cleanupGroupResInfo(&pInfo->groupResInfo);
  if (pInfo->pChildren != NULL) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3788 3789
      SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
      destroyOperatorInfo(pChild);
5
54liuyao 已提交
3790
    }
5
54liuyao 已提交
3791
    taosArrayDestroy(pInfo->pChildren);
5
54liuyao 已提交
3792
  }
5
54liuyao 已提交
3793 3794
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  blockDataDestroy(pInfo->pDelRes);
5
54liuyao 已提交
3795
  tSimpleHashCleanup(pInfo->pSeDeleted);
D
dapan1121 已提交
3796
  taosMemoryFreeClear(param);
5
54liuyao 已提交
3797 3798 3799
}

bool isTsInWindow(SStateWindowInfo* pWin, TSKEY ts) {
5
54liuyao 已提交
3800
  if (pWin->winInfo.sessionWin.win.skey <= ts && ts <= pWin->winInfo.sessionWin.win.ekey) {
5
54liuyao 已提交
3801 3802 3803 3804 3805 3806
    return true;
  }
  return false;
}

bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
5
54liuyao 已提交
3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838
  return pKeyData && compareVal(pKeyData, pWin->pStateKey);
}

bool compareStateKey(void* data, void* key) {
  SStateKeys* stateKey = (SStateKeys*)key;
  stateKey->pData = (char*)key + sizeof(SStateKeys);
  return compareVal(data, stateKey);
}

void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
                       SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) {
  int32_t size = pAggSup->resultRowSize;
  pCurWin->winInfo.sessionWin.groupId = groupId;
  pCurWin->winInfo.sessionWin.win.skey = ts;
  pCurWin->winInfo.sessionWin.win.ekey = ts;
  int32_t code =
      streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize,
                                    compareStateKey, &pCurWin->winInfo.pOutputBuf, &size);
  pCurWin->pStateKey =
      (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
  pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
  pCurWin->pStateKey->type = pAggSup->stateKeyType;
  pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
  pCurWin->pStateKey->isNull = false;

  if (code == TSDB_CODE_SUCCESS) {
    pCurWin->winInfo.isOutput = true;
  } else {
    if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
      varDataCopy(pCurWin->pStateKey->pData, pKeyData);
    } else {
      memcpy(pCurWin->pStateKey->pData, pKeyData, pCurWin->pStateKey->bytes);
5
54liuyao 已提交
3839 3840 3841
    }
  }

5
54liuyao 已提交
3842 3843 3844 3845 3846 3847
  pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
  pNextWin->winInfo.pOutputBuf = NULL;
  SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
  code = streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_INVALID(pNextWin->winInfo);
5
54liuyao 已提交
3848
  }
5
54liuyao 已提交
3849
  streamStateFreeCur(pCur);
5
54liuyao 已提交
3850 3851
}

5
54liuyao 已提交
3852
int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId,
H
Haojun Liao 已提交
3853
                              SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual,
5
54liuyao 已提交
3854
                              SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) {
5
54liuyao 已提交
3855 3856 3857 3858
  *allEqual = true;
  for (int32_t i = start; i < rows; ++i) {
    char* pKeyData = colDataGetData(pKeyCol, i);
    if (!isTsInWindow(pWinInfo, pTs[i])) {
X
Xiaoyu Wang 已提交
3859
      if (isEqualStateKey(pWinInfo, pKeyData)) {
5
54liuyao 已提交
3860
        if (IS_VALID_SESSION_WIN(pNextWin->winInfo)) {
5
54liuyao 已提交
3861
          // ts belongs to the next window
5
54liuyao 已提交
3862
          if (pTs[i] >= pNextWin->winInfo.sessionWin.win.skey) {
5
54liuyao 已提交
3863 3864 3865 3866 3867 3868 3869
            return i - start;
          }
        }
      } else {
        return i - start;
      }
    }
5
54liuyao 已提交
3870 3871

    if (pWinInfo->winInfo.sessionWin.win.skey > pTs[i]) {
H
Haojun Liao 已提交
3872
      if (pSeDeleted && pWinInfo->winInfo.isOutput) {
5
54liuyao 已提交
3873
        saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
5
54liuyao 已提交
3874
      }
5
54liuyao 已提交
3875 3876
      removeSessionResult(pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin);
      pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
5
54liuyao 已提交
3877
    }
5
54liuyao 已提交
3878
    pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]);
5
54liuyao 已提交
3879 3880 3881 3882 3883 3884 3885
    if (!isEqualStateKey(pWinInfo, pKeyData)) {
      *allEqual = false;
    }
  }
  return rows - start;
}

5
54liuyao 已提交
3886 3887
static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
                                 SSHashObj* pStDeleted) {
X
Xiaoyu Wang 已提交
3888
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3889
  SStreamStateAggOperatorInfo* pInfo = pOperator->info;
3890
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
3891
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
X
Xiaoyu Wang 已提交
3892 3893 3894 3895
  int64_t                      code = TSDB_CODE_SUCCESS;
  TSKEY*                       tsCols = NULL;
  SResultRow*                  pResult = NULL;
  int32_t                      winRows = 0;
5
54liuyao 已提交
3896
  if (pSDataBlock->pDataBlock != NULL) {
X
Xiaoyu Wang 已提交
3897 3898
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
    tsCols = (int64_t*)pColDataInfo->pData;
5
54liuyao 已提交
3899
  } else {
X
Xiaoyu Wang 已提交
3900
    return;
5
54liuyao 已提交
3901
  }
L
Liu Jicong 已提交
3902

5
54liuyao 已提交
3903
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3904 3905
  int32_t              rows = pSDataBlock->info.rows;
  blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
L
Liu Jicong 已提交
3906
  SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
5
54liuyao 已提交
3907
  for (int32_t i = 0; i < rows; i += winRows) {
5
54liuyao 已提交
3908
    if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
5
54liuyao 已提交
3909 3910 3911
      i++;
      continue;
    }
5
54liuyao 已提交
3912 3913 3914 3915 3916 3917 3918 3919 3920
    char*            pKeyData = colDataGetData(pKeyColInfo, i);
    int32_t          winIndex = 0;
    bool             allEqual = true;
    SStateWindowInfo curWin = {0};
    SStateWindowInfo nextWin = {0};
    setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin);
    setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
    winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
                                    pAggSup->pResultRows, pSeUpdated, pStDeleted);
5
54liuyao 已提交
3921
    if (!allEqual) {
3922
      uint64_t uid = 0;
5
54liuyao 已提交
3923 3924 3925 3926 3927
      appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
                                       &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
      tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
      doDeleteSessionWindow(pAggSup, &curWin.winInfo.sessionWin);
      releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curWin.winInfo.pOutputBuf);
5
54liuyao 已提交
3928 3929
      continue;
    }
5
54liuyao 已提交
3930 3931
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
                              pOperator);
5
54liuyao 已提交
3932
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
3933
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3934
    }
5
54liuyao 已提交
3935 3936
    saveSessionOutputBuf(pAggSup, &curWin.winInfo);

5
54liuyao 已提交
3937
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
5
54liuyao 已提交
3938
      code = saveResult(curWin.winInfo, pSeUpdated);
5
54liuyao 已提交
3939
      if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
3940
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3941 3942
      }
    }
3943 3944

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
3945 3946
      SSessionKey key = {0};
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
3947 3948
      tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
    }
5
54liuyao 已提交
3949 3950 3951 3952 3953 3954 3955 3956
  }
}

static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

3957
  SExprSupp*                   pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3958
  SStreamStateAggOperatorInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
3959
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
L
Liu Jicong 已提交
3960
  int64_t                      maxTs = INT64_MIN;
5
54liuyao 已提交
3961
  if (pOperator->status == OP_RES_TO_RETURN) {
3962
    doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3963
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3964
      printDataBlock(pInfo->pDelRes, "single state delete");
5
54liuyao 已提交
3965 3966
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3967 3968 3969 3970 3971

    doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
    if (pBInfo->pRes->info.rows > 0) {
      printDataBlock(pBInfo->pRes, "single state");
      return pBInfo->pRes;
5
54liuyao 已提交
3972
    }
5
54liuyao 已提交
3973

H
Haojun Liao 已提交
3974
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
3975
    return NULL;
5
54liuyao 已提交
3976 3977
  }

X
Xiaoyu Wang 已提交
3978
  _hash_fn_t     hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3979
  SSHashObj*     pSeUpdated = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
3980
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3981
  SArray*        pUpdated = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3982 3983 3984 3985 3986
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      break;
    }
5
54liuyao 已提交
3987
    printDataBlock(pBlock, "single state recv");
3988

5
54liuyao 已提交
3989 3990 3991 3992
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
      doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
3993
      removeSessionResults(pSeUpdated, pWins);
5
54liuyao 已提交
3994
      copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
3995 3996
      taosArrayDestroy(pWins);
      continue;
3997
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3998
      getAllSessionWindow(pInfo->streamAggSup.pResultRows, pSeUpdated);
5
54liuyao 已提交
3999
      continue;
5
54liuyao 已提交
4000
    }
4001

5
54liuyao 已提交
4002 4003 4004 4005
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
4006
    // the pDataBlock are always the same one, no need to call this again
4007
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
4008
    doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
4009
    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
5
54liuyao 已提交
4010
  }
4011
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
4012 4013
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;
X
Xiaoyu Wang 已提交
4014

5
54liuyao 已提交
4015
  closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pSeUpdated);
5
54liuyao 已提交
4016
  copyUpdateResult(pSeUpdated, pUpdated);
5
54liuyao 已提交
4017 4018
  removeSessionResults(pInfo->pSeDeleted, pUpdated);
  tSimpleHashCleanup(pSeUpdated);
5
54liuyao 已提交
4019

5
54liuyao 已提交
4020
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
5
54liuyao 已提交
4021
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
4022

5
54liuyao 已提交
4023 4024 4025 4026 4027 4028
#if 0
  char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState);
  qDebug("===stream===final session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

4029
  doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
4030
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4031
    printDataBlock(pInfo->pDelRes, "single state delete");
5
54liuyao 已提交
4032 4033 4034
    return pInfo->pDelRes;
  }

5
54liuyao 已提交
4035 4036 4037 4038 4039
  doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
  if (pBInfo->pRes->info.rows > 0) {
    printDataBlock(pBInfo->pRes, "single state");
    return pBInfo->pRes;
  }
H
Haojun Liao 已提交
4040
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
4041
  return NULL;
4042 4043
}

X
Xiaoyu Wang 已提交
4044 4045 4046 4047 4048
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo) {
  SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
  int32_t                      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
  SColumnNode*                 pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
H
Haojun Liao 已提交
4049
  int32_t                      code = TSDB_CODE_SUCCESS;
5
54liuyao 已提交
4050

X
Xiaoyu Wang 已提交
4051 4052
  SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo));
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5
54liuyao 已提交
4053
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
4054
    code = TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
4055 4056 4057 4058
    goto _error;
  }

  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
4059
  initResultSizeInfo(&pOperator->resultInfo, 4096);
5
54liuyao 已提交
4060 4061 4062
  if (pStateNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
H
Haojun Liao 已提交
4063
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
5
54liuyao 已提交
4064 4065 4066 4067 4068
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

X
Xiaoyu Wang 已提交
4069 4070
  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pStateNode->window.watermark,
5
54liuyao 已提交
4071 4072
      .calTrigger = pStateNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
4073
      .minTs = INT64_MAX,
X
Xiaoyu Wang 已提交
4074
  };
4075

5
54liuyao 已提交
4076
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
4077

5
54liuyao 已提交
4078 4079 4080
  SExprSupp*   pSup = &pOperator->exprSupp;
  int32_t      numOfCols = 0;
  SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
4081
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
4082
  code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
5
54liuyao 已提交
4083 4084 4085
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
5
54liuyao 已提交
4086 4087 4088 4089
  int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
  int16_t type = pColNode->node.resType.type;
  code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
                                type);
5
54liuyao 已提交
4090 4091 4092 4093 4094 4095
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  pInfo->primaryTsIndex = tsSlotId;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
4096
  pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
4097
  pInfo->pDelIterator = NULL;
H
Haojun Liao 已提交
4098
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
5
54liuyao 已提交
4099
  pInfo->pChildren = NULL;
5
54liuyao 已提交
4100
  pInfo->ignoreExpiredData = pStateNode->window.igExpired;
5
54liuyao 已提交
4101

L
Liu Jicong 已提交
4102 4103
  setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
4104 4105
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
                                         optrDefaultBufFn, NULL);
5
54liuyao 已提交
4106
  initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
5
54liuyao 已提交
4107 4108 4109 4110 4111 4112 4113
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
  return pOperator;

_error:
4114
  destroyStreamStateOperatorInfo(pInfo);
5
54liuyao 已提交
4115 4116 4117 4118
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
4119

4120
void destroyMAIOperatorInfo(void* param) {
4121
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
4122
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
D
dapan1121 已提交
4123
  taosMemoryFreeClear(param);
4124 4125
}

4126
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
H
Haojun Liao 已提交
4127
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
4128 4129 4130
  if (NULL == pResult) {
    return pResult;
  }
H
Haojun Liao 已提交
4131
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
4132 4133
  return pResult;
}
4134

4135 4136 4137 4138 4139 4140 4141 4142
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
  if (*pResult == NULL) {
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
    if (*pResult == NULL) {
      return terrno;
    }
  }
4143

4144
  // set time window for current result
4145 4146
  (*pResult)->win = (*win);
  setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
4147
  return TSDB_CODE_SUCCESS;
4148 4149
}

4150
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
4151
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
4152
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
D
dapan1121 已提交
4153
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
4154 4155

  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
4156
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
4157
  SInterval*     pInterval = &iaInfo->interval;
4158

5
54liuyao 已提交
4159 4160
  int32_t  startPos = 0;
  int64_t* tsCols = extractTsCol(pBlock, iaInfo);
4161

4162 4163
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);

4164 4165
  // there is an result exists
  if (miaInfo->curTs != INT64_MIN) {
4166
    if (ts != miaInfo->curTs) {
4167
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
4168
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
4169
      miaInfo->curTs = ts;
4170
    }
4171 4172
  } else {
    miaInfo->curTs = ts;
4173 4174 4175
  }

  STimeWindow win = {0};
4176
  win.skey = miaInfo->curTs;
4177
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
4178

5
54liuyao 已提交
4179
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4180 4181
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
    T_LONG_JMP(pTaskInfo->env, ret);
4182 4183
  }

4184 4185
  int32_t currPos = startPos;

4186
  STimeWindow currWin = win;
4187
  while (++currPos < pBlock->info.rows) {
4188
    if (tsCols[currPos] == miaInfo->curTs) {
4189
      continue;
4190 4191 4192
    }

    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
L
Liu Jicong 已提交
4193 4194
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
                                    currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
4195

4196
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
4197
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
4198
    miaInfo->curTs = tsCols[currPos];
4199

4200
    currWin.skey = miaInfo->curTs;
4201
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
4202 4203

    startPos = currPos;
5
54liuyao 已提交
4204
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4205 4206
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
      T_LONG_JMP(pTaskInfo->env, ret);
4207
    }
4208 4209

    miaInfo->curTs = currWin.skey;
4210
  }
4211

4212
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
H
Haojun Liao 已提交
4213
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
L
Liu Jicong 已提交
4214
                                  pBlock->info.rows, pSup->numOfExprs);
4215 4216
}

4217
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
H
Haojun Liao 已提交
4218
  pRes->info.id.groupId = pMiaInfo->groupId;
4219 4220
  pMiaInfo->curTs = INT64_MIN;
  pMiaInfo->groupId = 0;
4221 4222
}

4223
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
S
shenglian zhou 已提交
4224
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4225

4226 4227
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
4228

4229 4230 4231 4232 4233
  SExprSupp*      pSup = &pOperator->exprSupp;
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
  int32_t         scanFlag = MAIN_SCAN;
4234

4235 4236
  while (1) {
    SSDataBlock* pBlock = NULL;
4237
    if (pMiaInfo->prefetchedBlock == NULL) {
4238 4239
      pBlock = downstream->fpSet.getNextFn(downstream);
    } else {
4240 4241
      pBlock = pMiaInfo->prefetchedBlock;
      pMiaInfo->prefetchedBlock = NULL;
4242

H
Haojun Liao 已提交
4243
      pMiaInfo->groupId = pBlock->info.id.groupId;
4244
    }
4245

4246
    // no data exists, all query processing is done
4247
    if (pBlock == NULL) {
4248 4249 4250
      // close last unclosed time window
      if (pMiaInfo->curTs != INT64_MIN) {
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
4251 4252
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
4253
      }
4254

H
Haojun Liao 已提交
4255
      setOperatorCompleted(pOperator);
4256
      break;
4257
    }
4258

H
Haojun Liao 已提交
4259
    if (pMiaInfo->groupId == 0) {
H
Haojun Liao 已提交
4260 4261 4262
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
        pMiaInfo->groupId = pBlock->info.id.groupId;
        pRes->info.id.groupId = pMiaInfo->groupId;
H
Haojun Liao 已提交
4263 4264
      }
    } else {
H
Haojun Liao 已提交
4265
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
H
Haojun Liao 已提交
4266
        // if there are unclosed time window, close it firstly.
4267
        ASSERT(pMiaInfo->curTs != INT64_MIN);
H
Haojun Liao 已提交
4268
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
4269
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
H
Haojun Liao 已提交
4270

4271 4272
        pMiaInfo->prefetchedBlock = pBlock;
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
H
Haojun Liao 已提交
4273
        break;
5
54liuyao 已提交
4274
      } else {
H
Haojun Liao 已提交
4275
        // continue
H
Haojun Liao 已提交
4276
        pRes->info.id.groupId = pMiaInfo->groupId;
H
Haojun Liao 已提交
4277
      }
4278
    }
4279

4280
    getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag);
4281
    setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
4282
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
4283

H
Haojun Liao 已提交
4284
    doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
4285 4286 4287
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
      break;
    }
4288
  }
4289 4290 4291 4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303
}

static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pRes = iaInfo->binfo.pRes;
  blockDataCleanup(pRes);

  if (iaInfo->binfo.mergeResultBlock) {
dengyihao's avatar
dengyihao 已提交
4304
    while (1) {
4305
      if (pOperator->status == OP_EXEC_DONE) {
4306 4307
        break;
      }
4308

4309
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
4310 4311 4312
        break;
      }

4313 4314 4315 4316
      doMergeAlignedIntervalAgg(pOperator);
    }
  } else {
    doMergeAlignedIntervalAgg(pOperator);
4317 4318 4319 4320 4321 4322 4323
  }

  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;
  return (rows == 0) ? NULL : pRes;
}

4324
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
4325
                                                      SExecTaskInfo* pTaskInfo) {
4326
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
4327
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4328 4329 4330 4331
  if (miaInfo == NULL || pOperator == NULL) {
    goto _error;
  }

D
dapan1121 已提交
4332 4333 4334 4335 4336
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
  if (miaInfo->intervalAggOperatorInfo == NULL) {
    goto _error;
  }

4337 4338 4339 4340 4341 4342 4343
  SInterval interval = {.interval = pNode->interval,
                        .sliding = pNode->sliding,
                        .intervalUnit = pNode->intervalUnit,
                        .slidingUnit = pNode->slidingUnit,
                        .offset = pNode->offset,
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision};

D
dapan1121 已提交
4344
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
4345
  SExprSupp*                pSup = &pOperator->exprSupp;
4346

H
Haojun Liao 已提交
4347 4348 4349 4350 4351
  int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

L
Liu Jicong 已提交
4352 4353 4354 4355
  miaInfo->curTs = INT64_MIN;
  iaInfo->win = pTaskInfo->window;
  iaInfo->inputOrder = TSDB_ORDER_ASC;
  iaInfo->interval = interval;
4356 4357
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
4358 4359

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
H
Haojun Liao 已提交
4360
  initResultSizeInfo(&pOperator->resultInfo, 512);
4361

H
Haojun Liao 已提交
4362 4363
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
H
Haojun Liao 已提交
4364

L
Liu Jicong 已提交
4365 4366
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
4367 4368 4369
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4370

H
Haojun Liao 已提交
4371
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
4372
  initBasicInfo(&iaInfo->binfo, pResBlock);
4373
  initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
4374

4375
  iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo);
4376
  if (iaInfo->timeWindowInterpo) {
4377
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
4378 4379
  }

4380
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
4381
  blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
L
Liu Jicong 已提交
4382 4383
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
4384

L
Liu Jicong 已提交
4385 4386
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo,
                                         optrDefaultBufFn, NULL);
4387 4388 4389 4390 4391 4392 4393 4394 4395

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
4396
  destroyMAIOperatorInfo(miaInfo);
4397 4398 4399 4400
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
4401 4402 4403 4404 4405

//=====================================================================================================================
// merge interval operator
typedef struct SMergeIntervalAggOperatorInfo {
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
L
Liu Jicong 已提交
4406 4407 4408 4409 4410 4411
  SList*                   groupIntervals;
  SListIter                groupIntervalsIter;
  bool                     hasGroupId;
  uint64_t                 groupId;
  SSDataBlock*             prefetchedBlock;
  bool                     inputBlocksFinished;
4412 4413
} SMergeIntervalAggOperatorInfo;

S
slzhou 已提交
4414
typedef struct SGroupTimeWindow {
L
Liu Jicong 已提交
4415
  uint64_t    groupId;
S
slzhou 已提交
4416 4417 4418
  STimeWindow window;
} SGroupTimeWindow;

4419
void destroyMergeIntervalOperatorInfo(void* param) {
4420
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
S
slzhou 已提交
4421
  tdListFree(miaInfo->groupIntervals);
4422
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
4423

D
dapan1121 已提交
4424
  taosMemoryFreeClear(param);
4425 4426
}

L
Liu Jicong 已提交
4427 4428
static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win,
                                    SSDataBlock* pResultBlock) {
4429 4430 4431
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
  SExecTaskInfo*                 pTaskInfo = pOperatorInfo->pTaskInfo;
4432
  bool                           ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4433 4434 4435
  SExprSupp*                     pExprSup = &pOperatorInfo->exprSupp;

  SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
L
Liu Jicong 已提交
4436 4437
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
      iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
4438
  ASSERT(p1 != NULL);
5
54liuyao 已提交
4439
  //  finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
4440
  tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
4441 4442 4443
  return TSDB_CODE_SUCCESS;
}

4444 4445 4446 4447
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
                                        STimeWindow* newWin) {
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
4448
  bool                           ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4449

S
slzhou 已提交
4450 4451
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
  tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
4452

S
slzhou 已提交
4453 4454 4455 4456 4457
  SListIter iter = {0};
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
  SListNode* listNode = NULL;
  while ((listNode = tdListNext(&iter)) != NULL) {
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
L
Liu Jicong 已提交
4458
    if (prevGrpWin->groupId != tableGroupId) {
S
slzhou 已提交
4459 4460
      continue;
    }
4461

S
slzhou 已提交
4462
    STimeWindow* prevWin = &prevGrpWin->window;
H
Haojun Liao 已提交
4463
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
5
54liuyao 已提交
4464
      //      finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
S
slzhou 已提交
4465 4466
      tdListPopNode(miaInfo->groupIntervals, listNode);
    }
4467 4468 4469 4470 4471 4472 4473 4474 4475 4476 4477 4478 4479 4480 4481 4482
  }

  return 0;
}

static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;

  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;

  int32_t     startPos = 0;
  int32_t     numOfOutput = pExprSup->numOfExprs;
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo);
H
Haojun Liao 已提交
4483
  uint64_t    tableGroupId = pBlock->info.id.groupId;
4484
  bool        ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4485 4486 4487
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
  SResultRow* pResult = NULL;

4488 4489
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
                                        iaInfo->inputOrder);
4490 4491 4492 4493 4494

  int32_t ret =
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
4495
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4496 4497 4498 4499
  }

  TSKEY   ekey = ascScan ? win.ekey : win.skey;
  int32_t forwardRows =
4500
      getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
4501
  ASSERT(forwardRows > 0);
4502 4503 4504

  // prev time window not interpolation yet.
  if (iaInfo->timeWindowInterpo) {
4505
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
4506 4507 4508 4509 4510 4511
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);

    // restore current time window
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
4512
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4513 4514 4515 4516 4517 4518 4519
    }

    // window start key interpolation
    doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
  }

  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
H
Haojun Liao 已提交
4520
  applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
L
Liu Jicong 已提交
4521
                                  pBlock->info.rows, numOfOutput);
4522 4523 4524 4525 4526 4527 4528 4529
  doCloseWindow(pResultRowInfo, iaInfo, pResult);

  // output previous interval results after this interval (&win) is closed
  outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);

  STimeWindow nextWin = win;
  while (1) {
    int32_t prevEndPos = forwardRows - 1 + startPos;
4530 4531
    startPos =
        getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
4532 4533 4534 4535 4536 4537 4538 4539 4540
    if (startPos < 0) {
      break;
    }

    // null data, failed to allocate more memory buffer
    int32_t code =
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
4541
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4542 4543 4544 4545
    }

    ekey = ascScan ? nextWin.ekey : nextWin.skey;
    forwardRows =
4546
        getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
4547 4548 4549 4550 4551

    // window start(end) key interpolation
    doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);

    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
H
Haojun Liao 已提交
4552
    applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
L
Liu Jicong 已提交
4553
                                    pBlock->info.rows, numOfOutput);
4554 4555 4556 4557 4558 4559 4560 4561 4562 4563 4564 4565 4566 4567 4568 4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579 4580 4581 4582 4583 4584 4585 4586 4587 4588
    doCloseWindow(pResultRowInfo, iaInfo, pResult);

    // output previous interval results after this interval (&nextWin) is closed
    outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
  }

  if (iaInfo->timeWindowInterpo) {
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
  }
}

static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pRes = iaInfo->binfo.pRes;
  blockDataCleanup(pRes);
  blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);

  if (!miaInfo->inputBlocksFinished) {
    SOperatorInfo* downstream = pOperator->pDownstream[0];
    int32_t        scanFlag = MAIN_SCAN;
    while (1) {
      SSDataBlock* pBlock = NULL;
      if (miaInfo->prefetchedBlock == NULL) {
        pBlock = downstream->fpSet.getNextFn(downstream);
      } else {
        pBlock = miaInfo->prefetchedBlock;
H
Haojun Liao 已提交
4589
        miaInfo->groupId = pBlock->info.id.groupId;
4590 4591 4592 4593
        miaInfo->prefetchedBlock = NULL;
      }

      if (pBlock == NULL) {
S
slzhou 已提交
4594
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
4595 4596 4597 4598 4599 4600
        miaInfo->inputBlocksFinished = true;
        break;
      }

      if (!miaInfo->hasGroupId) {
        miaInfo->hasGroupId = true;
H
Haojun Liao 已提交
4601 4602
        miaInfo->groupId = pBlock->info.id.groupId;
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
4603 4604 4605 4606
        miaInfo->prefetchedBlock = pBlock;
        break;
      }

4607
      getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag);
4608
      setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true);
4609 4610 4611 4612 4613 4614 4615
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);

      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
        break;
      }
    }

H
Haojun Liao 已提交
4616
    pRes->info.id.groupId = miaInfo->groupId;
4617 4618 4619
  }

  if (miaInfo->inputBlocksFinished) {
S
slzhou 已提交
4620
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
4621

S
slzhou 已提交
4622 4623
    if (listNode != NULL) {
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
5
54liuyao 已提交
4624
      //      finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
H
Haojun Liao 已提交
4625
      pRes->info.id.groupId = grpWin->groupId;
4626 4627 4628 4629
    }
  }

  if (pRes->info.rows == 0) {
H
Haojun Liao 已提交
4630
    setOperatorCompleted(pOperator);
4631 4632 4633 4634 4635 4636 4637
  }

  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;
  return (rows == 0) ? NULL : pRes;
}

4638 4639 4640
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
                                               SExecTaskInfo* pTaskInfo) {
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
4641
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4642
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
4643 4644 4645
    goto _error;
  }

5
54liuyao 已提交
4646 4647
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4648 4649 4650 4651 4652 4653 4654

  SInterval interval = {.interval = pIntervalPhyNode->interval,
                        .sliding = pIntervalPhyNode->sliding,
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
                        .offset = pIntervalPhyNode->offset,
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4655

4656
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
4657

4658
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
L
Liu Jicong 已提交
4659
  pIntervalInfo->win = pTaskInfo->window;
4660
  pIntervalInfo->inputOrder = TSDB_ORDER_ASC;
L
Liu Jicong 已提交
4661
  pIntervalInfo->interval = interval;
4662 4663
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
4664 4665 4666 4667

  SExprSupp* pExprSupp = &pOperator->exprSupp;

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
4668
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4669

L
Liu Jicong 已提交
4670 4671
  int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
                            pTaskInfo->streamInfo.pState);
4672 4673 4674
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4675

H
Haojun Liao 已提交
4676
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
4677 4678
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
  initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
4679

4680 4681
  pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
  if (pIntervalInfo->timeWindowInterpo) {
4682
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
4683
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
4684 4685 4686 4687
      goto _error;
    }
  }

4688
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
4689 4690
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
L
Liu Jicong 已提交
4691 4692
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo,
                                         optrDefaultBufFn, NULL);
4693 4694 4695 4696 4697 4698 4699 4700 4701

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
H
Haojun Liao 已提交
4702 4703 4704 4705
  if (pMergeIntervalInfo != NULL) {
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
  }

4706 4707 4708
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
4709
}
4710 4711 4712

static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
4713 4714
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  int64_t                      maxTs = INT64_MIN;
5
54liuyao 已提交
4715
  int64_t                      minTs = INT64_MAX;
4716
  SExprSupp*                   pSup = &pOperator->exprSupp;
4717 4718 4719 4720 4721 4722

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  if (pOperator->status == OP_RES_TO_RETURN) {
4723
    doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
4724
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4725
      printDataBlock(pInfo->pDelRes, "single interval delete");
4726 4727 4728
      return pInfo->pDelRes;
    }

4729
    doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
4730 4731 4732
    if (pInfo->binfo.pRes->info.rows > 0) {
      printDataBlock(pInfo->binfo.pRes, "single interval");
      return pInfo->binfo.pRes;
4733
    }
4734 4735
    deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
                          &pInfo->delKey);
H
Haojun Liao 已提交
4736
    setOperatorCompleted(pOperator);
L
Liu Jicong 已提交
4737
    // streamStateCommit(pTaskInfo->streamInfo.pState);
5
54liuyao 已提交
4738
    return NULL;
4739 4740 4741 4742 4743 4744 4745 4746 4747 4748 4749
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

  SArray*    pUpdated = taosArrayInit(4, POINTER_BYTES);  // SResKeyPos
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  SHashObj*  pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);

  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
4750 4751
      qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
      pInfo->numOfDatapack = 0;
4752 4753
      break;
    }
5
54liuyao 已提交
4754
    pInfo->numOfDatapack++;
4755 4756
    printDataBlock(pBlock, "single interval recv");

4757 4758 4759
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778
      continue;
    } else if (pBlock->info.type == STREAM_GET_ALL) {
      getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
      continue;
    }

    if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) {
      // set input version
      pTaskInfo->version = pBlock->info.version;
    }

    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }

    // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
    // caller. Note that all the time window are not close till now.
    // the pDataBlock are always the same one, no need to call this again
4779
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
4780 4781 4782 4783 4784
    if (pInfo->invertible) {
      setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
    }

    maxTs = TMAX(maxTs, pBlock->info.window.ekey);
5
54liuyao 已提交
4785
    minTs = TMIN(minTs, pBlock->info.window.skey);
H
Haojun Liao 已提交
4786

H
Haojun Liao 已提交
4787
    doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pUpdatedMap);
4788 4789
  }
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
4790
  pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs);
4791
  pOperator->status = OP_RES_TO_RETURN;
4792
  removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
5
54liuyao 已提交
4793
  closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
4794
                            pInfo->pDelWins, pOperator);
4795 4796 4797 4798 4799 4800 4801 4802 4803 4804

  void* pIte = NULL;
  while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
    taosArrayPush(pUpdated, pIte);
  }
  taosArraySort(pUpdated, resultrowComparAsc);

  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
  taosHashCleanup(pUpdatedMap);
5
54liuyao 已提交
4805

4806
  doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
4807
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4808
    printDataBlock(pInfo->pDelRes, "single interval delete");
4809 4810 4811
    return pInfo->pDelRes;
  }

4812
  doBuildResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
4813 4814 4815 4816 4817 4818
  if (pInfo->binfo.pRes->info.rows > 0) {
    printDataBlock(pInfo->binfo.pRes, "single interval");
    return pInfo->binfo.pRes;
  }

  return NULL;
4819 4820 4821 4822 4823
}

SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo) {
  SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
4824
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4825 4826 4827 4828 4829
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
  SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;

H
Haojun Liao 已提交
4830
  int32_t    code = TSDB_CODE_SUCCESS;
4831 4832
  int32_t    numOfCols = 0;
  SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
4833

H
Haojun Liao 已提交
4834
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
4835 4836 4837 4838 4839 4840 4841 4842
  SInterval    interval = {
         .interval = pIntervalPhyNode->interval,
         .sliding = pIntervalPhyNode->sliding,
         .intervalUnit = pIntervalPhyNode->intervalUnit,
         .slidingUnit = pIntervalPhyNode->slidingUnit,
         .offset = pIntervalPhyNode->offset,
         .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
  };
H
Haojun Liao 已提交
4843

4844 4845 4846 4847
  STimeWindowAggSupp twAggSupp = {
      .waterMark = pIntervalPhyNode->window.watermark,
      .calTrigger = pIntervalPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
4848
      .minTs = INT64_MAX,
5
54liuyao 已提交
4849
      .deleteMark = getDeleteMark(pIntervalPhyNode),
4850
  };
H
Haojun Liao 已提交
4851

5
54liuyao 已提交
4852
  ASSERTS(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
4853

4854 4855 4856 4857 4858 4859 4860
  pOperator->pTaskInfo = pTaskInfo;
  pInfo->interval = interval;
  pInfo->twAggSup = twAggSupp;
  pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
  pInfo->isFinal = false;

  SExprSupp* pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
4861 4862 4863 4864
  initBasicInfo(&pInfo->binfo, pResBlock);
  initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

4865
  pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
4866
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4867

4868
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
L
Liu Jicong 已提交
4869 4870
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
4871 4872 4873 4874
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
4875 4876 4877 4878 4879 4880 4881 4882
  if (pIntervalPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }
4883 4884

  pInfo->invertible = allInvertible(pSup->pCtx, numOfCols);
4885
  pInfo->invertible = false;
4886 4887 4888 4889 4890
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
  pInfo->delIndex = 0;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
  initResultRowInfo(&pInfo->binfo.resultRowInfo);

4891 4892 4893 4894 4895 4896 4897 4898 4899 4900 4901 4902 4903
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
  streamStateSetNumber(pInfo->pState, -1);

  pInfo->pPhyNode = NULL;  // create new child
  pInfo->pPullDataMap = NULL;
  pInfo->pPullWins = NULL;  // SPullWindowInfo
  pInfo->pullIndex = 0;
  pInfo->pPullDataRes = NULL;
  pInfo->isFinal = false;
  pInfo->pChildren = NULL;
  pInfo->delKey.ts = INT64_MAX;
  pInfo->delKey.groupId = 0;
5
54liuyao 已提交
4904
  pInfo->numOfDatapack = 0;
4905

L
Liu Jicong 已提交
4906 4907
  setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
4908 4909
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
                                         destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
4910

5
54liuyao 已提交
4911
  initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
4912 4913 4914 4915 4916 4917 4918 4919
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
4920
  destroyStreamFinalIntervalOperatorInfo(pInfo);
4921 4922 4923 4924
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
H
Haojun Liao 已提交
4925