timewindowoperator.c 188.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
#include "executorimpl.h"
16
#include "filter.h"
X
Xiaoyu Wang 已提交
17
#include "function.h"
5
54liuyao 已提交
18
#include "functionMgt.h"
H
Haojun Liao 已提交
19
#include "tcommon.h"
5
54liuyao 已提交
20
#include "tcompare.h"
L
Liu Jicong 已提交
21
#include "tdatablock.h"
H
Haojun Liao 已提交
22
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
23
#include "tglobal.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tlog.h"
25
#include "ttime.h"
26

L
Liu Jicong 已提交
27
#define IS_FINAL_OP(op)    ((op)->isFinal)
L
liuyao 已提交
28 29 30
// #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);

#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL);
H
Haojun Liao 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55

typedef struct SSessionAggOperatorInfo {
  SOptrBasicInfo     binfo;
  SAggSupporter      aggSup;
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
  bool               reptScan;  // next round scan
  int64_t            gap;       // session window gap
  int32_t            tsSlotId;  // primary timestamp slot id
  STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;

typedef struct SStateWindowOperatorInfo {
  SOptrBasicInfo     binfo;
  SAggSupporter      aggSup;
  SExprSupp          scalarSup;
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
  SColumn            stateCol;  // start row index
  bool               hasKey;
  SStateKeys         stateKey;
  int32_t            tsSlotId;  // primary timestamp column slot id
  STimeWindowAggSupp twAggSup;
} SStateWindowOperatorInfo;

56 57 58 59 60
typedef enum SResultTsInterpType {
  RESULT_ROW_START_INTERP = 1,
  RESULT_ROW_END_INTERP = 2,
} SResultTsInterpType;

5
54liuyao 已提交
61 62
typedef struct SPullWindowInfo {
  STimeWindow window;
L
Liu Jicong 已提交
63
  uint64_t    groupId;
5
54liuyao 已提交
64
  STimeWindow calWin;
5
54liuyao 已提交
65 66
} SPullWindowInfo;

67 68
typedef struct SOpenWindowInfo {
  SResultRowPosition pos;
L
Liu Jicong 已提交
69
  uint64_t           groupId;
70 71
} SOpenWindowInfo;

72 73
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);

L
Liu Jicong 已提交
74 75
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
                                              uint64_t groupId);
76 77
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);

X
Xiaoyu Wang 已提交
78
static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
79 80 81

static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
82
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
83 84
                                      SExecTaskInfo* pTaskInfo) {
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
D
dapan1121 已提交
85
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
86 87 88 89 90 91 92 93

  if (pResultRow == NULL) {
    *pResult = NULL;
    return TSDB_CODE_SUCCESS;
  }

  // set time window for current result
  pResultRow->win = (*win);
94

95
  *pResult = pResultRow;
96
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
97

98 99 100 101 102 103 104 105 106 107 108 109 110
  return TSDB_CODE_SUCCESS;
}

static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
  int64_t* ts = (int64_t*)pColData->pData;
  int32_t  delta = includeEndpoint ? 1 : 0;

  int64_t duration = pWin->ekey - pWin->skey + delta;
  ts[2] = duration;            // set the duration
  ts[3] = pWin->skey;          // window start key
  ts[4] = pWin->ekey + delta;  // window end key
}

111
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
112 113 114
  pRowSup->win.ekey = ts;
  pRowSup->prevTs = ts;
  pRowSup->numOfRows += 1;
115
  pRowSup->groupId = groupId;
116 117
}

dengyihao's avatar
dengyihao 已提交
118 119
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
                                     uint64_t groupId) {
120 121 122
  pRowSup->startRowIndex = rowIndex;
  pRowSup->numOfRows = 0;
  pRowSup->win.skey = tsList[rowIndex];
123
  pRowSup->groupId = groupId;
124 125
}

L
Liu Jicong 已提交
126 127
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
                                            int32_t order, int64_t* pData) {
128
  int32_t forwardRows = 0;
129 130 131 132

  if (order == TSDB_ORDER_ASC) {
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
    if (end >= 0) {
133
      forwardRows = end;
134

S
slzhou 已提交
135
      while (pData[end + pos] == ekey) {
136
        forwardRows += 1;
S
slzhou 已提交
137
        ++pos;
138 139 140
      }
    }
  } else {
141
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
142
    if (end >= 0) {
143
      forwardRows = end;
144

S
slzhou 已提交
145
      while (pData[end + pos] == ekey) {
146
        forwardRows += 1;
S
slzhou 已提交
147
        ++pos;
148 149
      }
    }
X
Xiaoyu Wang 已提交
150 151 152 153 154 155 156 157
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
    //    if (end >= 0) {
    //      forwardRows = pos - end;
    //
    //      if (pData[end] == ekey) {
    //        forwardRows += 1;
    //      }
    //    }
158 159
  }

160 161
  assert(forwardRows >= 0);
  return forwardRows;
162 163
}

5
54liuyao 已提交
164
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
  int32_t midPos = -1;
  int32_t numOfRows;

  if (num <= 0) {
    return -1;
  }

  assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);

  TSKEY*  keyList = (TSKEY*)pValue;
  int32_t firstPos = 0;
  int32_t lastPos = num - 1;

  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller than the key
    while (1) {
181 182 183 184 185 186 187 188 189 190 191
      if (key >= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key < keyList[lastPos]) {
        lastPos += 1;
        if (lastPos >= num) {
          return -1;
        } else {
          return lastPos;
        }
      }
192 193 194 195 196 197

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < keyList[midPos]) {
        firstPos = midPos + 1;
198 199
      } else if (key > keyList[midPos]) {
        lastPos = midPos - 1;
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger than the key
    while (1) {
      if (key <= keyList[firstPos]) return firstPos;
      if (key == keyList[lastPos]) return lastPos;

      if (key > keyList[lastPos]) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1u) + firstPos;

      if (key < keyList[midPos]) {
        lastPos = midPos - 1;
      } else if (key > keyList[midPos]) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

X
Xiaoyu Wang 已提交
235 236
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
  assert(startPos >= 0 && startPos < pDataBlockInfo->rows);

  int32_t num = -1;
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);

  if (order == TSDB_ORDER_ASC) {
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
      if (item != NULL) {
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
      }
    } else {
      num = pDataBlockInfo->rows - startPos;
      if (item != NULL) {
        item->lastKey = pDataBlockInfo->window.ekey + step;
      }
    }
  } else {  // desc
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
      if (item != NULL) {
258
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
259 260
      }
    } else {
261
      num = pDataBlockInfo->rows - startPos;
262
      if (item != NULL) {
263
        item->lastKey = pDataBlockInfo->window.ekey + step;
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
      }
    }
  }

  assert(num >= 0);
  return num;
}

static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t order, STimeWindow* tw) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
  // convert key to second
  key = convertTimePrecision(key, precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
  time_t    t = (time_t)key;
290
  taosLocalTime(&t, &tm, NULL);
291 292 293 294

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
295
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
296 297 298 299

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
300
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, precision);
301 302 303 304

  tw->ekey -= 1;
}

5
54liuyao 已提交
305 306 307 308
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  getNextTimeWindow(pInterval, pInterval->precision, order, tw);
}

309 310
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
311
  SqlFunctionCtx* pCtx = pSup->pCtx;
312

313
  int32_t index = 1;
314
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
H
Haojun Liao 已提交
315
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
316 317 318 319
      pCtx[k].start.key = INT64_MIN;
      continue;
    }

X
Xiaoyu Wang 已提交
320
    SFunctParam*     pParam = &pCtx[k].param[0];
321 322
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);

323
    ASSERT(pColInfo->info.type == pParam->pCol->type && curTs != windowKey);
324

325
    double v1 = 0, v2 = 0, v = 0;
326
    if (prevRowIndex == -1) {
327
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
328
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData);
329
    } else {
330
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex));
331 332
    }

333
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
334

335
#if 0
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
    if (functionId == FUNCTION_INTERP) {
      if (type == RESULT_ROW_START_INTERP) {
        pCtx[k].start.key = prevTs;
        pCtx[k].start.val = v1;

        pCtx[k].end.key = curTs;
        pCtx[k].end.val = v2;

        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
          if (prevRowIndex == -1) {
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
          } else {
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
          }

          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
        }
      }
    } else if (functionId == FUNCTION_TWA) {
355 356
#endif

X
Xiaoyu Wang 已提交
357 358 359
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
    SPoint point = (SPoint){.key = windowKey, .val = &v};
360

X
Xiaoyu Wang 已提交
361
    taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
362

X
Xiaoyu Wang 已提交
363 364 365 366 367 368
    if (type == RESULT_ROW_START_INTERP) {
      pCtx[k].start.key = point.key;
      pCtx[k].start.val = v;
    } else {
      pCtx[k].end.key = point.key;
      pCtx[k].end.val = v;
369
    }
X
Xiaoyu Wang 已提交
370 371 372

    index += 1;
  }
373
#if 0
374
  }
375
#endif
376 377 378 379 380 381 382 383 384 385 386 387 388 389
}

static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
  if (type == RESULT_ROW_START_INTERP) {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      pCtx[k].start.key = INT64_MIN;
    }
  } else {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      pCtx[k].end.key = INT64_MIN;
    }
  }
}

390 391
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
392
  bool ascQuery = (pInfo->inputOrder == TSDB_ORDER_ASC);
393

394
  TSKEY curTs = tsCols[pos];
395 396

  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
X
Xiaoyu Wang 已提交
397
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
398 399 400 401 402

  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
  // start exactly from this point, no need to do interpolation
  TSKEY key = ascQuery ? win->skey : win->ekey;
  if (key == curTs) {
403
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
404 405 406
    return true;
  }

407 408
  // it is the first time window, no need to do interpolation
  if (pTsKey->isNull && pos == 0) {
409
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
410 411
  } else {
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
412 413
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
                              RESULT_ROW_START_INTERP, pSup);
414 415 416 417 418
  }

  return true;
}

419 420 421
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
                                            SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
                                            STimeWindow* win) {
422
  int32_t order = pInfo->inputOrder;
423 424

  TSKEY actualEndKey = tsCols[endRowIndex];
425
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
426 427

  // not ended in current data block, do not invoke interpolation
428
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
429
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
430 431 432
    return false;
  }

433
  // there is actual end point of current time window, no interpolation needs
434
  if (key == actualEndKey) {
435
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
436 437 438
    return true;
  }

439
  int32_t nextRowIndex = endRowIndex + 1;
440 441 442
  assert(nextRowIndex >= 0);

  TSKEY nextKey = tsCols[nextRowIndex];
443 444
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
                            RESULT_ROW_END_INTERP, pSup);
445 446 447
  return true;
}

448 449
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
  if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
5
54liuyao 已提交
450 451 452 453 454
    return false;
  }
  return true;
}

455 456 457 458
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
}

459
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
5
54liuyao 已提交
460
                                      TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
X
Xiaoyu Wang 已提交
461
  bool ascQuery = (order == TSDB_ORDER_ASC);
462 463 464 465 466 467 468 469 470 471

  int32_t precision = pInterval->precision;
  getNextTimeWindow(pInterval, precision, order, pNext);

  // next time window is not in current block
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
    return -1;
  }

5
54liuyao 已提交
472 473 474 475
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
    return -1;
  }

476
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
477 478 479 480
  int32_t startPos = 0;

  // tumbling time window query, a special case of sliding time window query
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
481
    startPos = prevPosition + 1;
482
  } else {
483
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
484 485
      startPos = 0;
    } else {
486
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529
    }
  }

  /* interp query with fill should not skip time window */
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
  //    return startPos;
  //  }

  /*
   * This time window does not cover any data, try next time window,
   * this case may happen when the time window is too small
   */
  if (primaryKeys == NULL) {
    if (ascQuery) {
      assert(pDataBlockInfo->window.skey <= pNext->ekey);
    } else {
      assert(pDataBlockInfo->window.ekey >= pNext->skey);
    }
  } else {
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
      TSKEY next = primaryKeys[startPos];
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
        pNext->skey = taosTimeTruncate(next, pInterval, precision);
        pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
      } else {
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
        pNext->skey = pNext->ekey - pInterval->interval + 1;
      }
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
      TSKEY next = primaryKeys[startPos];
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
        pNext->skey = taosTimeTruncate(next, pInterval, precision);
        pNext->ekey = taosTimeAdd(pNext->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
      } else {
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
        pNext->ekey = pNext->skey + pInterval->interval - 1;
      }
    }
  }

  return startPos;
}

530
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
531
  ASSERT(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
  if (type == RESULT_ROW_START_INTERP) {
    return pResult->startInterp == true;
  } else {
    return pResult->endInterp == true;
  }
}

static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
  assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
  if (type == RESULT_ROW_START_INTERP) {
    pResult->startInterp = true;
  } else {
    pResult->endInterp = true;
  }
}

548 549
static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
                                        STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
550
  if (!pInfo->timeWindowInterpo) {
551 552 553
    return;
  }

554
  ASSERT(pBlock != NULL);
555 556 557 558 559
  if (pBlock->pDataBlock == NULL) {
    //    tscError("pBlock->pDataBlock == NULL");
    return;
  }

560
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
561 562

  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
563
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
564
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
565
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
566 567 568 569
    if (interp) {
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
    }
  } else {
570
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
571 572 573 574 575 576 577 578
  }

  // point interpolation does not require the end key time window interpolation.
  //  if (pointInterpQuery) {
  //    return;
  //  }

  // interpolation query does not generate the time window end interpolation
579
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
580
  if (!done) {
581
    int32_t endRowIndex = startPos + forwardRows - 1;
582

583
    TSKEY endKey = (pInfo->inputOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
584
    bool  interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
585 586 587 588
    if (interp) {
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
    }
  } else {
589
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
590 591 592
  }
}

593 594
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
  if (pBlock->pDataBlock == NULL) {
595 596 597
    return;
  }

598 599 600 601 602 603 604
  size_t num = taosArrayGetSize(pPrevKeys);
  for (int32_t k = 0; k < num; ++k) {
    SColumn* pc = taosArrayGet(pCols, k);

    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);

    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
X
Xiaoyu Wang 已提交
605
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
606 607 608 609 610 611 612
      if (colDataIsNull_s(pColInfo, i)) {
        continue;
      }

      char* val = colDataGetData(pColInfo, i);
      if (IS_VAR_DATA_TYPE(pkey->type)) {
        memcpy(pkey->pData, val, varDataTLen(val));
613
        ASSERT(varDataTLen(val) <= pkey->bytes);
614 615 616 617 618 619
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }

      break;
    }
620 621 622
  }
}

623 624 625 626
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;

627
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
628
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
629

L
Liu Jicong 已提交
630 631
  int32_t startPos = 0;
  int32_t numOfOutput = pSup->numOfExprs;
632

633
  SResultRow* pResult = NULL;
634

635
  while (1) {
L
Liu Jicong 已提交
636 637 638
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
    uint64_t            groupId = pOpenWin->groupId;
639
    SResultRowPosition* p1 = &pOpenWin->pos;
640 641 642
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
      break;
    }
643

644
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
645 646 647
    if (NULL == pr) {
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
648

649
    ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
650

651
    if (pr->closed) {
652
      ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
X
Xiaoyu Wang 已提交
653
             isResultRowInterpolated(pr, RESULT_ROW_END_INTERP));
654 655
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
      taosMemoryFree(pNode);
656 657
      continue;
    }
658

659
    STimeWindow w = pr->win;
660 661
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
662
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
663
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
664 665
    }

666
    ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
667

X
Xiaoyu Wang 已提交
668 669
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
    int64_t     prevTs = *(int64_t*)pTsKey->pData;
H
Haojun Liao 已提交
670
    if (groupId == pBlock->info.id.groupId) {
671 672 673
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
                                RESULT_ROW_END_INTERP, pSup);
    }
674 675

    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
676
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
677

678
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
L
Liu Jicong 已提交
679 680
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
                                    pBlock->info.rows, numOfExprs);
681 682 683

    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
      closeResultRow(pr);
684 685
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
      taosMemoryFree(pNode);
X
Xiaoyu Wang 已提交
686
    } else {  // the remains are can not be closed yet.
687
      break;
688
    }
689
  }
690
}
691

5
54liuyao 已提交
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);

int32_t binarySearchCom(void* keyList, int num, void* pKey, int order, __compare_fn_t comparefn) {
  int firstPos = 0, lastPos = num - 1, midPos = -1;
  int numOfRows = 0;

  if (num <= 0) return -1;
  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller or equal than the key
    while (1) {
      if (comparefn(pKey, keyList, lastPos) >= 0) return lastPos;
      if (comparefn(pKey, keyList, firstPos) == 0) return firstPos;
      if (comparefn(pKey, keyList, firstPos) < 0) return firstPos - 1;

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (comparefn(pKey, keyList, midPos) < 0) {
        lastPos = midPos - 1;
      } else if (comparefn(pKey, keyList, midPos) > 0) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger or equal than the key
    while (1) {
      if (comparefn(pKey, keyList, firstPos) <= 0) return firstPos;
      if (comparefn(pKey, keyList, lastPos) == 0) return lastPos;

      if (comparefn(pKey, keyList, lastPos) > 0) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (comparefn(pKey, keyList, midPos) < 0) {
        lastPos = midPos - 1;
      } else if (comparefn(pKey, keyList, midPos) > 0) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
  }

  return midPos;
}

5
54liuyao 已提交
748
typedef int64_t (*__get_value_fn_t)(void* data, int32_t index);
749

X
Xiaoyu Wang 已提交
750 751 752
int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_fn_t getValuefn) {
  int firstPos = 0, lastPos = num - 1, midPos = -1;
  int numOfRows = 0;
5
54liuyao 已提交
753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798

  if (num <= 0) return -1;
  if (order == TSDB_ORDER_DESC) {
    // find the first position which is smaller or equal than the key
    while (1) {
      if (key >= getValuefn(keyList, lastPos)) return lastPos;
      if (key == getValuefn(keyList, firstPos)) return firstPos;
      if (key < getValuefn(keyList, firstPos)) return firstPos - 1;

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < getValuefn(keyList, midPos)) {
        lastPos = midPos - 1;
      } else if (key > getValuefn(keyList, midPos)) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }

  } else {
    // find the first position which is bigger or equal than the key
    while (1) {
      if (key <= getValuefn(keyList, firstPos)) return firstPos;
      if (key == getValuefn(keyList, lastPos)) return lastPos;

      if (key > getValuefn(keyList, lastPos)) {
        lastPos = lastPos + 1;
        if (lastPos >= num)
          return -1;
        else
          return lastPos;
      }

      numOfRows = lastPos - firstPos + 1;
      midPos = (numOfRows >> 1) + firstPos;

      if (key < getValuefn(keyList, midPos)) {
        lastPos = midPos - 1;
      } else if (key > getValuefn(keyList, midPos)) {
        firstPos = midPos + 1;
      } else {
        break;
      }
    }
799 800
  }

5
54liuyao 已提交
801 802 803
  return midPos;
}

5
54liuyao 已提交
804
int32_t comparePullWinKey(void* pKey, void* data, int32_t index) {
L
Liu Jicong 已提交
805
  SArray*          res = (SArray*)data;
5
54liuyao 已提交
806
  SPullWindowInfo* pos = taosArrayGet(res, index);
L
Liu Jicong 已提交
807
  SPullWindowInfo* pData = (SPullWindowInfo*)pKey;
5
54liuyao 已提交
808
  if (pData->groupId > pos->groupId) {
5
54liuyao 已提交
809
    return 1;
5
54liuyao 已提交
810 811 812 813 814 815 816 817
  } else if (pData->groupId < pos->groupId) {
    return -1;
  }

  if (pData->window.skey > pos->window.ekey) {
    return 1;
  } else if (pData->window.ekey < pos->window.skey) {
    return -1;
5
54liuyao 已提交
818
  }
5
54liuyao 已提交
819
  return 0;
5
54liuyao 已提交
820 821 822 823 824 825 826 827
}

static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
  int32_t size = taosArrayGetSize(pPullWins);
  int32_t index = binarySearchCom(pPullWins, size, pPullInfo, TSDB_ORDER_DESC, comparePullWinKey);
  if (index == -1) {
    index = 0;
  } else {
5
54liuyao 已提交
828 829
    int32_t code = comparePullWinKey(pPullInfo, pPullWins, index);
    if (code == 0) {
L
Liu Jicong 已提交
830
      SPullWindowInfo* pos = taosArrayGet(pPullWins, index);
5
54liuyao 已提交
831 832 833 834
      pos->window.skey = TMIN(pos->window.skey, pPullInfo->window.skey);
      pos->window.ekey = TMAX(pos->window.ekey, pPullInfo->window.ekey);
      pos->calWin.skey = TMIN(pos->calWin.skey, pPullInfo->calWin.skey);
      pos->calWin.ekey = TMAX(pos->calWin.ekey, pPullInfo->calWin.ekey);
5
54liuyao 已提交
835
      return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
836
    } else if (code > 0) {
5
54liuyao 已提交
837
      index++;
5
54liuyao 已提交
838 839 840 841 842 843 844 845
    }
  }
  if (taosArrayInsert(pPullWins, index, pPullInfo) == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
846 847 848
static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
  winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
  return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
5
54liuyao 已提交
849 850
}

5
54liuyao 已提交
851 852
static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
  tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES);
5
54liuyao 已提交
853
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
854 855
}

5
54liuyao 已提交
856 857 858 859
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
  SWinKey key = {.ts = ts, .groupId = groupId};
  saveWinResult(&key, pPos, pUpdatedMap);
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
860 861
}

862
static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) {
5
54liuyao 已提交
863 864
  int32_t size = taosArrayGetSize(pWins);
  for (int32_t i = 0; i < size; i++) {
H
Haojun Liao 已提交
865
    SWinKey* pW = taosArrayGet(pWins, i);
866
    void*    tmp = tSimpleHashGet(pUpdatedMap, pW, sizeof(SWinKey));
5
54liuyao 已提交
867 868 869
    if (tmp) {
      void* value = *(void**)tmp;
      taosMemoryFree(value);
870
      tSimpleHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
5
54liuyao 已提交
871
    }
5
54liuyao 已提交
872 873 874
  }
}

875
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
L
liuyao 已提交
876 877
  void* pDataPos = taosArrayGet((SArray*)data, index);
  return winKeyCmprImpl(pKey, pDataPos);
5
54liuyao 已提交
878 879
}

880
static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) {
5
54liuyao 已提交
881 882
  taosArraySort(pDelWins, winKeyCmprImpl);
  taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
L
Liu Jicong 已提交
883
  int32_t delSize = taosArrayGetSize(pDelWins);
884
  if (tSimpleHashGetSize(pUpdatedMap) == 0 || delSize == 0) {
5
54liuyao 已提交
885
    return;
dengyihao's avatar
dengyihao 已提交
886
  }
887 888 889 890 891 892
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pUpdatedMap, pIte, &iter)) != NULL) {
    SWinKey* pResKey = tSimpleHashGetKey(pIte, NULL);
    int32_t  index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinKey);
    if (index >= 0 && 0 == compareWinKey(pResKey, pDelWins, index)) {
893
      taosArrayRemove(pDelWins, index);
894
      delSize = taosArrayGetSize(pDelWins);
895 896 897 898
    }
  }
}

5
54liuyao 已提交
899
bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
5
54liuyao 已提交
900
  ASSERTS(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0, "maxts should greater than 0");
5
54liuyao 已提交
901
  return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
5
54liuyao 已提交
902 903
}

5
54liuyao 已提交
904 905 906 907 908
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { return isOverdue(pWin->ekey, pTwSup); }

bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
  return pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark;
}
5
54liuyao 已提交
909

5
54liuyao 已提交
910
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
911
                            int32_t scanFlag) {
912
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
913

914
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
915
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
916

X
Xiaoyu Wang 已提交
917
  int32_t     startPos = 0;
918
  int32_t     numOfOutput = pSup->numOfExprs;
X
Xiaoyu Wang 已提交
919
  int64_t*    tsCols = extractTsCol(pBlock, pInfo);
H
Haojun Liao 已提交
920
  uint64_t    tableGroupId = pBlock->info.id.groupId;
921
  bool        ascScan = (pInfo->inputOrder == TSDB_ORDER_ASC);
X
Xiaoyu Wang 已提交
922 923
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
  SResultRow* pResult = NULL;
924

925 926
  STimeWindow win =
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
927 928
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
929
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
930
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
931
  }
932

X
Xiaoyu Wang 已提交
933 934
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
  int32_t forwardRows =
935
      getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
936 937

  // prev time window not interpolation yet.
938
  if (pInfo->timeWindowInterpo) {
939
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
940
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
941 942

    // restore current time window
943 944
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
945
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
946
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
947 948
    }

949
    // window start key interpolation
950
    doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
951
  }
952

953
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
L
Liu Jicong 已提交
954 955
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
                                  pBlock->info.rows, numOfOutput);
956 957

  doCloseWindow(pResultRowInfo, pInfo, pResult);
958 959 960

  STimeWindow nextWin = win;
  while (1) {
961
    int32_t prevEndPos = forwardRows - 1 + startPos;
962
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->inputOrder);
963 964 965 966
    if (startPos < 0) {
      break;
    }
    // null data, failed to allocate more memory buffer
X
Xiaoyu Wang 已提交
967
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
968
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
969
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
970
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
971 972
    }

X
Xiaoyu Wang 已提交
973
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
974
    forwardRows =
975
        getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
976
    // window start(end) key interpolation
977
    doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
L
Liu Jicong 已提交
978
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
S
shenglian zhou 已提交
979
#if 0
980 981 982 983
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
        (!ascScan && ekey >= pBlock->info.window.skey)) {
      // window start(end) key interpolation
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
984
    } else if (pInfo->timeWindowInterpo) {
985 986
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
    }
S
shenglian zhou 已提交
987
#endif
988
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
L
Liu Jicong 已提交
989 990
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
                                    pBlock->info.rows, numOfOutput);
991
    doCloseWindow(pResultRowInfo, pInfo, pResult);
992 993 994
  }

  if (pInfo->timeWindowInterpo) {
995
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
996
  }
997 998 999 1000 1001 1002
}

void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
  // current result is done in computing final results.
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
    closeResultRow(pResult);
1003
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
D
dapan1121 已提交
1004
    taosMemoryFree(pNode);
1005 1006 1007
  }
}

1008 1009 1010 1011 1012
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) {
  SOpenWindowInfo openWin = {0};
  openWin.pos.pageId = pResult->pageId;
  openWin.pos.offset = pResult->offset;
  openWin.groupId = groupId;
L
Liu Jicong 已提交
1013
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
1014
  if (pn == NULL) {
1015 1016
    tdListAppend(pResultRowInfo->openWindow, &openWin);
    return openWin.pos;
1017 1018
  }

L
Liu Jicong 已提交
1019
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
1020 1021
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
    tdListAppend(pResultRowInfo->openWindow, &openWin);
1022 1023
  }

1024
  return openWin.pos;
1025 1026 1027 1028
}

int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
  TSKEY* tsCols = NULL;
1029

D
dapan1121 已提交
1030
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
1031 1032
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
    tsCols = (int64_t*)pColDataInfo->pData;
H
Haojun Liao 已提交
1033
    ASSERT(tsCols[0] != 0);
1034

1035 1036 1037 1038 1039 1040
    // no data in primary ts
    if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) {
      return NULL;
    }

    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
1041 1042 1043 1044 1045
      blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
    }
  }

  return tsCols;
1046 1047 1048 1049 1050 1051 1052
}

static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
  }

1053 1054 1055
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SOperatorInfo* downstream = pOperator->pDownstream[0];

1056
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
1057
  SExprSupp*                pSup = &pOperator->exprSupp;
1058

1059
  int32_t scanFlag = MAIN_SCAN;
1060
  int64_t st = taosGetTimestampUs();
1061 1062

  while (1) {
1063
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1064 1065 1066 1067
    if (pBlock == NULL) {
      break;
    }

1068
    getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag, true);
1069

1070
    if (pInfo->scalarSupp.pExprInfo != NULL) {
L
Liu Jicong 已提交
1071 1072
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
1073 1074
    }

1075
    // the pDataBlock are always the same one, no need to call this again
1076
    setInputDataBlock(pSup, pBlock, pInfo->inputOrder, scanFlag, true);
1077
    hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag);
1078 1079
  }

1080
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->resultTsOrder);
1081
  OPTR_SET_OPENED(pOperator);
1082 1083

  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1084 1085 1086
  return TSDB_CODE_SUCCESS;
}

1087 1088 1089 1090 1091
static bool compareVal(const char* v, const SStateKeys* pKey) {
  if (IS_VAR_DATA_TYPE(pKey->type)) {
    if (varDataLen(v) != varDataLen(pKey->pData)) {
      return false;
    } else {
D
dapan1121 已提交
1092
      return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
1093 1094 1095 1096 1097 1098
    }
  } else {
    return memcmp(pKey->pData, v, pKey->bytes) == 0;
  }
}

1099
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
L
Liu Jicong 已提交
1100
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1101
  SExprSupp*     pSup = &pOperator->exprSupp;
1102

1103
  SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
H
Haojun Liao 已提交
1104
  int64_t          gid = pBlock->info.id.groupId;
1105 1106

  bool    masterScan = true;
1107
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
1108 1109
  int16_t bytes = pStateColInfoData->info.bytes;

1110
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
1111 1112 1113 1114 1115
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;

  SWindowRowsSup* pRowSup = &pInfo->winSup;
  pRowSup->numOfRows = 0;

1116
  struct SColumnDataAgg* pAgg = NULL;
1117
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
X
Xiaoyu Wang 已提交
1118
    pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL;
1119
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
1120 1121 1122 1123 1124
      continue;
    }

    char* val = colDataGetData(pStateColInfoData, j);

1125
    if (gid != pRowSup->groupId || !pInfo->hasKey) {
1126 1127 1128 1129 1130 1131 1132
      // todo extract method
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
        varDataCopy(pInfo->stateKey.pData, val);
      } else {
        memcpy(pInfo->stateKey.pData, val, bytes);
      }

1133 1134
      pInfo->hasKey = true;

1135 1136
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1137
    } else if (compareVal(val, &pInfo->stateKey)) {
1138
      doKeepTuple(pRowSup, tsList[j], gid);
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
      if (j == 0 && pRowSup->startRowIndex != 0) {
        pRowSup->startRowIndex = 0;
      }
    } else {  // a new state window started
      SResultRow* pResult = NULL;

      // keep the time window for the closed time window.
      STimeWindow window = pRowSup->win;

      pRowSup->win.ekey = pRowSup->win.skey;
1149 1150
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1151
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1152
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1153 1154 1155
      }

      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
H
Haojun Liao 已提交
1156
      applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
L
Liu Jicong 已提交
1157
                                      pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1158 1159

      // here we start a new session window
1160 1161
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1162 1163 1164 1165 1166 1167 1168

      // todo extract method
      if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
        varDataCopy(pInfo->stateKey.pData, val);
      } else {
        memcpy(pInfo->stateKey.pData, val, bytes);
      }
1169 1170 1171 1172 1173
    }
  }

  SResultRow* pResult = NULL;
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
1174 1175
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1176
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1177
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1178 1179 1180
  }

  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
L
Liu Jicong 已提交
1181 1182
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
                                  pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1183 1184
}

H
Hongze Cheng 已提交
1185
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
1186 1187
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1188 1189 1190
  }

  SStateWindowOperatorInfo* pInfo = pOperator->info;
1191
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1192

1193 1194 1195
  SExprSupp* pSup = &pOperator->exprSupp;
  int32_t    order = TSDB_ORDER_ASC;
  int64_t    st = taosGetTimestampUs();
1196 1197 1198

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  while (1) {
1199
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1200 1201 1202 1203
    if (pBlock == NULL) {
      break;
    }

1204
    setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
1205 1206
    blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);

1207 1208 1209 1210 1211 1212 1213 1214 1215
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
    if (pInfo->scalarSup.pExprInfo != NULL) {
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
      }
    }

1216 1217 1218
    doStateWindowAggImpl(pOperator, pInfo, pBlock);
  }

X
Xiaoyu Wang 已提交
1219
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1220
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1221 1222
  pOperator->status = OP_RES_TO_RETURN;

1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236
  return TSDB_CODE_SUCCESS;
}

static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SStateWindowOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;

  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1237
    setOperatorCompleted(pOperator);
1238 1239 1240
    return NULL;
  }

1241
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1242
  while (1) {
1243
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1244
    doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1245

1246
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1247
    if (!hasRemain) {
H
Haojun Liao 已提交
1248
      setOperatorCompleted(pOperator);
1249 1250
      break;
    }
1251

1252 1253 1254 1255
    if (pBInfo->pRes->info.rows > 0) {
      break;
    }
  }
1256

1257
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1258
  return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1259 1260
}

1261
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
1262
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1263
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1264 1265 1266 1267 1268 1269

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pBlock = pInfo->binfo.pRes;
1270 1271 1272 1273
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
    return NULL;
  }
1274

1275 1276 1277
  while (1) {
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
1278

1279 1280
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
    if (!hasRemain) {
H
Haojun Liao 已提交
1281
      setOperatorCompleted(pOperator);
1282
      break;
1283 1284
    }

1285 1286 1287
    if (pBlock->info.rows > 0) {
      break;
    }
1288
  }
1289 1290 1291 1292 1293

  size_t rows = pBlock->info.rows;
  pOperator->resultInfo.totalRows += rows;

  return (rows == 0) ? NULL : pBlock;
1294 1295
}

5
54liuyao 已提交
1296
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
L
Liu Jicong 已提交
1297
  for (int i = 0; i < num; i++) {
5
54liuyao 已提交
1298 1299
    if (type == STREAM_INVERT) {
      fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
L
Liu Jicong 已提交
1300
    } else if (type == STREAM_NORMAL) {
5
54liuyao 已提交
1301 1302 1303 1304
      fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet));
    }
  }
}
5
54liuyao 已提交
1305

5
54liuyao 已提交
1306
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
1307
  SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
1308 1309 1310
  if (NULL == pResult) {
    return;
  }
1311

1312
  SqlFunctionCtx* pCtx = pSup->pCtx;
5
54liuyao 已提交
1313
  for (int32_t i = 0; i < numOfOutput; ++i) {
1314
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
1315 1316 1317 1318 1319 1320 1321 1322 1323
    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }
    pResInfo->initialized = false;
    if (pCtx[i].functionId != -1) {
      pCtx[i].fpSet.init(&pCtx[i], pResInfo);
    }
  }
5
54liuyao 已提交
1324
  SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
1325 1326 1327
  if (NULL == bufPage) {
    return;
  }
5
54liuyao 已提交
1328 1329
  setBufPageDirty(bufPage, true);
  releaseBufPage(pResultBuf, bufPage);
5
54liuyao 已提交
1330 1331
}

1332
static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) {
5
54liuyao 已提交
1333 1334 1335
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SWinKey                      key = {.ts = ts, .groupId = groupId};
  tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
1336
  streamStateDel(pInfo->pState, &key);
5
54liuyao 已提交
1337 1338 1339
  return true;
}

1340
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
1341
                            SSHashObj* pUpdatedMap) {
1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
  SColumnInfoData*             pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*                       startTsCols = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData*             pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*                       endTsCols = (TSKEY*)pEndTsCol->pData;
  SColumnInfoData*             pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*                       calStTsCols = (TSKEY*)pCalStTsCol->pData;
  SColumnInfoData*             pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*                       calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
  SColumnInfoData*             pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*                    pGpDatas = (uint64_t*)pGpCol->pData;
5
54liuyao 已提交
1353
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
H
Haojun Liao 已提交
1354
    SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1355
    dumyInfo.cur.pageId = -1;
H
Haojun Liao 已提交
1356

1357 1358 1359 1360 1361 1362 1363 1364
    STimeWindow win = {0};
    if (IS_FINAL_OP(pInfo)) {
      win.skey = startTsCols[i];
      win.ekey = endTsCols[i];
    } else {
      win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
    }

5
54liuyao 已提交
1365
    do {
1366 1367 1368 1369
      if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i])) {
        getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
        continue;
      }
5
54liuyao 已提交
1370 1371
      uint64_t winGpId = pGpDatas[i];
      SWinKey  winRes = {.ts = win.skey, .groupId = winGpId};
dengyihao's avatar
dengyihao 已提交
1372
      void*    chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
L
liuyao 已提交
1373 1374 1375 1376
      if (chIds) {
        getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
        continue;
      }
dengyihao's avatar
dengyihao 已提交
1377
      bool res = doDeleteWindow(pOperator, win.skey, winGpId);
5
54liuyao 已提交
1378 1379 1380 1381
      if (pUpWins && res) {
        taosArrayPush(pUpWins, &winRes);
      }
      if (pUpdatedMap) {
1382
        tSimpleHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
5
54liuyao 已提交
1383 1384
      }
      getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
5
54liuyao 已提交
1385
    } while (win.ekey <= endTsCols[i]);
5
54liuyao 已提交
1386 1387 1388
  }
}

1389
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) {
1390 1391 1392
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
1393 1394 1395
    SWinKey* pKey = tSimpleHashGetKey(pIte, NULL);
    uint64_t groupId = pKey->groupId;
    TSKEY    ts = pKey->ts;
5
54liuyao 已提交
1396
    int32_t  code = saveWinResultInfo(ts, groupId, *(SRowBuffPos**)pIte, resWins);
5
54liuyao 已提交
1397 1398 1399 1400 1401 1402 1403
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
1404
static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
1405
                                         SHashObj* pPullDataMap, SSHashObj* closeWins, SArray* pDelWins,
1406
                                         SOperatorInfo* pOperator) {
5
54liuyao 已提交
1407
  qDebug("===stream===close interval window");
1408 1409 1410
  void*                        pIte = NULL;
  int32_t                      iter = 0;
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
1411
  int32_t                      delSize = taosArrayGetSize(pDelWins);
5
54liuyao 已提交
1412
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
1413
    void*    key = tSimpleHashGetKey(pIte, NULL);
1414 1415 1416 1417 1418 1419 1420 1421 1422
    SWinKey* pWinKey = (SWinKey*)key;
    if (delSize > 0) {
      int32_t index = binarySearchCom(pDelWins, delSize, pWinKey, TSDB_ORDER_DESC, compareWinKey);
      if (index >= 0 && 0 == compareWinKey(pWinKey, pDelWins, index)) {
        taosArrayRemove(pDelWins, index);
        delSize = taosArrayGetSize(pDelWins);
      }
    }

5
54liuyao 已提交
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441
    void*       chIds = taosHashGet(pPullDataMap, pWinKey, sizeof(SWinKey));
    STimeWindow win = {
        .skey = pWinKey->ts,
        .ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1,
    };
    if (isCloseWindow(&win, pTwSup)) {
      if (chIds && pPullDataMap) {
        SArray* chAy = *(SArray**)chIds;
        int32_t size = taosArrayGetSize(chAy);
        qDebug("===stream===window %" PRId64 " wait child size:%d", pWinKey->ts, size);
        for (int32_t i = 0; i < size; i++) {
          qDebug("===stream===window %" PRId64 " wait child id:%d", pWinKey->ts, *(int32_t*)taosArrayGet(chAy, i));
        }
        continue;
      } else if (pPullDataMap) {
        qDebug("===stream===close window %" PRId64, pWinKey->ts);
      }

      if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
1442
        int32_t code = saveWinResult(pWinKey, *(SRowBuffPos**)pIte, closeWins);
5
54liuyao 已提交
1443 1444 1445 1446 1447
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
      }
      tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459
    }
  }
  return TSDB_CODE_SUCCESS;
}

STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
  STimeWindow w = {.skey = ts, .ekey = INT64_MAX};
  w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
  return w;
}

static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) {
5
54liuyao 已提交
1460 1461
  int32_t size = taosArrayGetSize(pChildren);
  for (int32_t i = 0; i < size; i++) {
1462 1463
    SOperatorInfo*               pChildOp = taosArrayGetP(pChildren, i);
    SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
5
54liuyao 已提交
1464
    ASSERTS(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE, "children trigger type should be at once");
5
54liuyao 已提交
1465
    pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
1466
    closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
1467
                              NULL, pOperator);
1468 1469 1470
  }
}

1471 1472
static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index,
                                SSDataBlock* pBlock) {
1473 1474 1475 1476 1477 1478 1479 1480
  blockDataCleanup(pBlock);
  int32_t size = taosArrayGetSize(pWins);
  if (*index == size) {
    *index = 0;
    taosArrayClear(pWins);
    return;
  }
  blockDataEnsureCapacity(pBlock, size - *index);
1481
  uint64_t uid = 0;
1482
  for (int32_t i = *index; i < size; i++) {
H
Haojun Liao 已提交
1483
    SWinKey* pWin = taosArrayGet(pWins, i);
1484 1485
    void*    tbname = NULL;
    streamStateGetParName(pInfo->pState, pWin->groupId, &tbname);
1486 1487 1488 1489 1490 1491 1492
    if (tbname == NULL) {
      appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
    } else {
      char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
      STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
      appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
    }
dengyihao's avatar
dengyihao 已提交
1493
    streamFreeVal(tbname);
1494
    (*index)++;
5
54liuyao 已提交
1495 1496 1497
  }
}

1498
static void destroyStateWindowOperatorInfo(void* param) {
1499
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1500
  cleanupBasicInfo(&pInfo->binfo);
1501
  taosMemoryFreeClear(pInfo->stateKey.pData);
1502
  cleanupExprSupp(&pInfo->scalarSup);
D
dapan1121 已提交
1503 1504 1505
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  cleanupAggSup(&pInfo->aggSup);
  cleanupGroupResInfo(&pInfo->groupResInfo);
1506

D
dapan1121 已提交
1507
  taosMemoryFreeClear(param);
1508 1509
}

H
Haojun Liao 已提交
1510
static void freeItem(void* param) {
L
Liu Jicong 已提交
1511
  SGroupKeys* pKey = (SGroupKeys*)param;
H
Haojun Liao 已提交
1512 1513 1514
  taosMemoryFree(pKey->pData);
}

1515
void destroyIntervalOperatorInfo(void* param) {
1516
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
1517
  cleanupBasicInfo(&pInfo->binfo);
1518
  cleanupAggSup(&pInfo->aggSup);
1519 1520 1521 1522
  cleanupExprSupp(&pInfo->scalarSupp);

  tdListFree(pInfo->binfo.resultRowInfo.openWindow);

H
Haojun Liao 已提交
1523 1524 1525 1526
  pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols);
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);

  pInfo->pPrevValues = NULL;
1527

H
Haojun Liao 已提交
1528 1529
  cleanupGroupResInfo(&pInfo->groupResInfo);
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
D
dapan1121 已提交
1530
  taosMemoryFreeClear(param);
1531 1532
}

1533
void destroyStreamFinalIntervalOperatorInfo(void* param) {
1534
  SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)param;
1535
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
1536
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
1537
  // it should be empty.
5
54liuyao 已提交
1538 1539 1540
  taosHashCleanup(pInfo->pPullDataMap);
  taosArrayDestroy(pInfo->pPullWins);
  blockDataDestroy(pInfo->pPullDataRes);
L
Liu Jicong 已提交
1541 1542
  taosArrayDestroy(pInfo->pDelWins);
  blockDataDestroy(pInfo->pDelRes);
5
54liuyao 已提交
1543
  streamFileStateDestroy(pInfo->pState->pFileState);
1544
  taosMemoryFreeClear(pInfo->pState);
5
54liuyao 已提交
1545

1546
  nodesDestroyNode((SNode*)pInfo->pPhyNode);
5
54liuyao 已提交
1547
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
5
54liuyao 已提交
1548
  pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows);
5
54liuyao 已提交
1549
  cleanupExprSupp(&pInfo->scalarSupp);
1550

D
dapan1121 已提交
1551
  taosMemoryFreeClear(param);
5
54liuyao 已提交
1552 1553
}

1554
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
5
54liuyao 已提交
1555
  for (int32_t i = 0; i < numOfCols; i++) {
5
54liuyao 已提交
1556
    if (fmIsUserDefinedFunc(pFCtx[i].functionId) || !fmIsInvertible(pFCtx[i].functionId)) {
5
54liuyao 已提交
1557 1558 1559 1560 1561 1562
      return false;
    }
  }
  return true;
}

1563
static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo) {
1564 1565 1566
  // the primary timestamp column
  bool needed = false;

L
Liu Jicong 已提交
1567
  for (int32_t i = 0; i < numOfCols; ++i) {
1568
    SExprInfo* pExpr = pCtx[i].pExpr;
H
Haojun Liao 已提交
1569
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
1570
      needed = true;
H
Haojun Liao 已提交
1571
      break;
1572 1573 1574
    }
  }

H
Haojun Liao 已提交
1575 1576 1577
  if (needed) {
    pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
    pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
1578

H
Haojun Liao 已提交
1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593
    {  // ts column
      SColumn c = {0};
      c.colId = 1;
      c.slotId = pInfo->primaryTsIndex;
      c.type = TSDB_DATA_TYPE_TIMESTAMP;
      c.bytes = sizeof(int64_t);
      taosArrayPush(pInfo->pInterpCols, &c);

      SGroupKeys key;
      key.bytes = c.bytes;
      key.type = c.type;
      key.isNull = true;  // to denote no value is assigned yet
      key.pData = taosMemoryCalloc(1, c.bytes);
      taosArrayPush(pInfo->pPrevValues, &key);
    }
1594 1595
  }

X
Xiaoyu Wang 已提交
1596
  for (int32_t i = 0; i < numOfCols; ++i) {
1597 1598
    SExprInfo* pExpr = pCtx[i].pExpr;

H
Haojun Liao 已提交
1599
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
1600 1601 1602
      SFunctParam* pParam = &pExpr->base.pParam[0];

      SColumn c = *pParam->pCol;
1603
      taosArrayPush(pInfo->pInterpCols, &c);
1604 1605

      SGroupKeys key = {0};
X
Xiaoyu Wang 已提交
1606 1607
      key.bytes = c.bytes;
      key.type = c.type;
1608
      key.isNull = false;
X
Xiaoyu Wang 已提交
1609
      key.pData = taosMemoryCalloc(1, c.bytes);
1610
      taosArrayPush(pInfo->pPrevValues, &key);
1611 1612 1613 1614 1615 1616
    }
  }

  return needed;
}

L
liuyao 已提交
1617
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) {
1618
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
L
liuyao 已提交
1619
    initIntervalDownStream(downstream->pDownstream[0], type, pInfo);
1620 1621
    return;
  }
5
54liuyao 已提交
1622
  SStreamScanInfo* pScanInfo = downstream->info;
1623
  pScanInfo->windowSup.parentType = type;
L
liuyao 已提交
1624
  pScanInfo->windowSup.pIntervalAggSup =  &pInfo->aggSup;
5
54liuyao 已提交
1625
  if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
L
liuyao 已提交
1626
    pScanInfo->pUpdateInfo = updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark);
5
54liuyao 已提交
1627
  }
L
liuyao 已提交
1628 1629 1630
  pScanInfo->interval = pInfo->interval;
  pScanInfo->twAggSup = pInfo->twAggSup;
  pScanInfo->pState = pInfo->pState;
5
54liuyao 已提交
1631 1632
}

H
Haojun Liao 已提交
1633 1634
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t i = 0; i < numOfExpr; i++) {
L
Liu Jicong 已提交
1635
    //    pCtx[i].isStream = true;
H
Haojun Liao 已提交
1636 1637 1638
  }
}

H
Haojun Liao 已提交
1639
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode,
5
54liuyao 已提交
1640
                                          SExecTaskInfo* pTaskInfo) {
1641
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
L
Liu Jicong 已提交
1642
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1643 1644 1645 1646
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
1647
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1648 1649 1650 1651 1652 1653
  initBasicInfo(&pInfo->binfo, pResBlock);

  SExprSupp* pSup = &pOperator->exprSupp;
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
H
Haojun Liao 已提交
1654 1655
  initResultSizeInfo(&pOperator->resultInfo, 512);
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
1656 1657 1658

  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
L
Liu Jicong 已提交
1659 1660
  int32_t    code =
      initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState);
H
Haojun Liao 已提交
1661 1662 1663 1664 1665
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  SInterval interval = {.interval = pPhyNode->interval,
1666 1667 1668 1669 1670
                        .sliding = pPhyNode->sliding,
                        .intervalUnit = pPhyNode->intervalUnit,
                        .slidingUnit = pPhyNode->slidingUnit,
                        .offset = pPhyNode->offset,
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
1671 1672 1673 1674 1675 1676 1677

  STimeWindowAggSupp as = {
      .waterMark = pPhyNode->window.watermark,
      .calTrigger = pPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
  };

L
Liu Jicong 已提交
1678
  pInfo->win = pTaskInfo->window;
1679 1680
  pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
  pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
H
Haojun Liao 已提交
1681 1682
  pInfo->interval = interval;
  pInfo->twAggSup = as;
1683
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
1684 1685 1686 1687

  if (pPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar);
H
Haojun Liao 已提交
1688
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
1689 1690 1691 1692
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }
1693

H
Haojun Liao 已提交
1694 1695 1696 1697 1698
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

1699
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
H
Haojun Liao 已提交
1700
  pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo);
1701
  if (pInfo->timeWindowInterpo) {
1702
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
H
Haojun Liao 已提交
1703 1704 1705
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
      goto _error;
    }
1706
  }
1707

1708
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
1709 1710
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1711

L
Liu Jicong 已提交
1712 1713
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo,
                                         optrDefaultBufFn, NULL);
1714 1715 1716 1717 1718 1719 1720 1721

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

L
Liu Jicong 已提交
1722
_error:
H
Haojun Liao 已提交
1723 1724 1725
  if (pInfo != NULL) {
    destroyIntervalOperatorInfo(pInfo);
  }
1726 1727 1728 1729 1730
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

1731
// todo handle multiple timeline cases. assume no timeline interweaving
1732 1733
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1734
  SExprSupp*     pSup = &pOperator->exprSupp;
1735

1736
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
1737 1738

  bool    masterScan = true;
1739
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
1740
  int64_t gid = pBlock->info.id.groupId;
1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754

  int64_t gap = pInfo->gap;

  if (!pInfo->reptScan) {
    pInfo->reptScan = true;
    pInfo->winSup.prevTs = INT64_MIN;
  }

  SWindowRowsSup* pRowSup = &pInfo->winSup;
  pRowSup->numOfRows = 0;

  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
1755 1756 1757
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
H
Haojun Liao 已提交
1758 1759
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
1760
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1761
      doKeepTuple(pRowSup, tsList[j], gid);
1762 1763 1764 1765 1766 1767 1768 1769 1770 1771
      if (j == 0 && pRowSup->startRowIndex != 0) {
        pRowSup->startRowIndex = 0;
      }
    } else {  // start a new session window
      SResultRow* pResult = NULL;

      // keep the time window for the closed time window.
      STimeWindow window = pRowSup->win;

      pRowSup->win.ekey = pRowSup->win.skey;
1772 1773
      int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
                                           numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1774
      if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1775
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1776 1777 1778 1779
      }

      // pInfo->numOfRows data belong to the current session window
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
H
Haojun Liao 已提交
1780
      applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
L
Liu Jicong 已提交
1781
                                      pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1782 1783

      // here we start a new session window
1784 1785
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
      doKeepTuple(pRowSup, tsList[j], gid);
1786 1787 1788 1789 1790
    }
  }

  SResultRow* pResult = NULL;
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
1791 1792
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1793
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
S
Shengliang Guan 已提交
1794
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
1795 1796 1797
  }

  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
L
Liu Jicong 已提交
1798 1799
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
                                  pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1800 1801
}

1802
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
1803 1804 1805 1806 1807 1808
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSessionAggOperatorInfo* pInfo = pOperator->info;
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
1809
  SExprSupp*               pSup = &pOperator->exprSupp;
1810 1811

  if (pOperator->status == OP_RES_TO_RETURN) {
1812
    while (1) {
1813
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1814
      doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1815

1816
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1817
      if (!hasRemain) {
H
Haojun Liao 已提交
1818
        setOperatorCompleted(pOperator);
1819 1820
        break;
      }
1821

1822 1823 1824 1825 1826
      if (pBInfo->pRes->info.rows > 0) {
        break;
      }
    }
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1827
    return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1828 1829
  }

1830 1831 1832
  int64_t st = taosGetTimestampUs();
  int32_t order = TSDB_ORDER_ASC;

1833 1834 1835
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
1836
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1837 1838 1839 1840 1841
    if (pBlock == NULL) {
      break;
    }

    // the pDataBlock are always the same one, no need to call this again
1842
    setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
1843 1844
    blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);

1845 1846 1847
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
  }

1848 1849
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

1850 1851 1852
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;

1853
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1854
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1855
  while (1) {
1856
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1857
    doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1858

1859
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1860
    if (!hasRemain) {
H
Haojun Liao 已提交
1861
      setOperatorCompleted(pOperator);
1862 1863
      break;
    }
1864

1865 1866 1867 1868 1869
    if (pBInfo->pRes->info.rows > 0) {
      break;
    }
  }
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1870
  return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1871 1872
}

H
Haojun Liao 已提交
1873
// todo make this as an non-blocking operator
1874 1875
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
                                             SExecTaskInfo* pTaskInfo) {
1876 1877 1878 1879 1880 1881
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

1882 1883 1884
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
  SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;

1885 1886 1887
  if (pStateNode->window.pExprs != NULL) {
    int32_t    numOfScalarExpr = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
H
Hongze Cheng 已提交
1888
    int32_t    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
1889 1890 1891 1892 1893
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

1894
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
1895 1896 1897 1898 1899 1900 1901
  pInfo->stateKey.type = pInfo->stateCol.type;
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
  if (pInfo->stateKey.pData == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
1902 1903 1904 1905 1906
  int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

1907 1908
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

1909 1910
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
1911
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
1912

L
Liu Jicong 已提交
1913 1914
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
1915 1916 1917 1918
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
1919
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
1920
  initBasicInfo(&pInfo->binfo, pResBlock);
1921
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1922

L
Liu Jicong 已提交
1923 1924
  pInfo->twAggSup =
      (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
1925

1926 1927
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

X
Xiaoyu Wang 已提交
1928
  pInfo->tsSlotId = tsSlotId;
1929

L
Liu Jicong 已提交
1930 1931
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
L
Liu Jicong 已提交
1932 1933
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo,
                                         optrDefaultBufFn, NULL);
1934

1935 1936 1937 1938 1939
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

1940 1941
  return pOperator;

L
Liu Jicong 已提交
1942
_error:
H
Haojun Liao 已提交
1943 1944 1945 1946
  if (pInfo != NULL) {
    destroyStateWindowOperatorInfo(pInfo);
  }

1947 1948
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
1949 1950 1951
  return NULL;
}

1952
void destroySWindowOperatorInfo(void* param) {
1953
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
1954 1955 1956
  if (pInfo == NULL) {
    return;
  }
1957

1958
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
1959 1960 1961 1962
  colDataDestroy(&pInfo->twAggSup.timeWindowData);

  cleanupAggSup(&pInfo->aggSup);
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
1963
  taosMemoryFreeClear(param);
1964 1965
}

H
Haojun Liao 已提交
1966
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
1967
                                            SExecTaskInfo* pTaskInfo) {
1968 1969 1970 1971 1972 1973
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

1974
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1975
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1976

1977
  int32_t      numOfCols = 0;
H
Haojun Liao 已提交
1978
  SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
1979
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1980
  initBasicInfo(&pInfo->binfo, pResBlock);
H
Haojun Liao 已提交
1981

L
Liu Jicong 已提交
1982 1983
  int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
                            pTaskInfo->streamInfo.pState);
1984 1985 1986 1987
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
1988 1989 1990 1991
  pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
  pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
  pInfo->gap = pSessionNode->gap;

1992
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1993 1994
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

1995
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
1996 1997 1998
  pInfo->binfo.pRes = pResBlock;
  pInfo->winSup.prevTs = INT64_MIN;
  pInfo->reptScan = false;
H
Haojun Liao 已提交
1999 2000 2001 2002
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2003

L
Liu Jicong 已提交
2004 2005
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
2006 2007
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo,
                                         optrDefaultBufFn, NULL);
2008 2009
  pOperator->pTaskInfo = pTaskInfo;
  code = appendDownstream(pOperator, &downstream, 1);
2010 2011 2012 2013
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2014 2015
  return pOperator;

L
Liu Jicong 已提交
2016
_error:
2017
  destroySWindowOperatorInfo(pInfo);
2018 2019 2020
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2021
}
5
54liuyao 已提交
2022

5
54liuyao 已提交
2023
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
2024
                      SExecTaskInfo* pTaskInfo, SColumnInfoData* pTimeWindowData) {
5
54liuyao 已提交
2025 2026
  for (int32_t k = 0; k < numOfOutput; ++k) {
    if (fmIsWindowPseudoColumnFunc(pDestCtx[k].functionId)) {
2027 2028 2029 2030 2031
      if (!pTimeWindowData) {
        continue;
      }

      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pDestCtx[k]);
L
Liu Jicong 已提交
2032 2033
      char*                p = GET_ROWCELL_INTERBUF(pEntryInfo);
      SColumnInfoData      idata = {0};
2034 2035 2036 2037 2038 2039 2040 2041
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
      idata.pData = p;

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pDestCtx[k].sfp.process(&tw, 1, &out);
      pEntryInfo->numOfRes = 1;
L
Liu Jicong 已提交
2042
    } else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
2043
      int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
5
54liuyao 已提交
2044
      if (code != TSDB_CODE_SUCCESS) {
5
54liuyao 已提交
2045
        qError("%s apply combine functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
5
54liuyao 已提交
2046
      }
5
54liuyao 已提交
2047 2048 2049 2050
    } else if (pDestCtx[k].fpSet.combine == NULL) {
      char* funName = fmGetFuncName(pDestCtx[k].functionId);
      qError("%s error, combine funcion for %s is not implemented", GET_TASKID(pTaskInfo), funName);
      taosMemoryFreeClear(funName);
5
54liuyao 已提交
2051 2052 2053 2054
    }
  }
}

dengyihao's avatar
dengyihao 已提交
2055
bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) { return streamStateCheck(pState, pKey); }
5
54liuyao 已提交
2056 2057

int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId,
dengyihao's avatar
dengyihao 已提交
2058 2059
                             SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset,
                             SAggSupporter* pAggSup) {
5
54liuyao 已提交
2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072
  SWinKey key = {
      .ts = win->skey,
      .groupId = groupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;

  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  *pResult = (SRowBuffPos*)value;
  SResultRow* res = (SResultRow*)((*pResult)->pRowBuff);
  // set time window for current result
dengyihao's avatar
dengyihao 已提交
2073
  res->win = (*win);
5
54liuyao 已提交
2074 2075
  setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
2076 2077
}

2078
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
L
liuyao 已提交
2079
  if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
5
54liuyao 已提交
2080
    SWinKey key = {.ts = pWin->skey, .groupId = groupId};
L
liuyao 已提交
2081
    if (!hasIntervalWindow(pState, &key)) {
5
54liuyao 已提交
2082
      return true;
5
54liuyao 已提交
2083
    }
5
54liuyao 已提交
2084
    return false;
5
54liuyao 已提交
2085 2086 2087 2088
  }
  return false;
}

L
Liu Jicong 已提交
2089 2090 2091 2092
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey,
                        STimeWindow* pNextWin) {
  int32_t forwardRows =
      getNumOfRowsInTimeWindow(pBlockInfo, tsCols, startPos, eKey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
5
54liuyao 已提交
2093 2094 2095 2096
  int32_t prevEndPos = forwardRows - 1 + startPos;
  return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
}

H
Haojun Liao 已提交
2097
void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
5
54liuyao 已提交
2098 2099 2100 2101
  SArray* childIds = taosArrayInit(8, sizeof(int32_t));
  for (int32_t i = 0; i < size; i++) {
    taosArrayPush(childIds, &i);
  }
H
Haojun Liao 已提交
2102
  taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*));
5
54liuyao 已提交
2103 2104 2105 2106
}

static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }

2107
static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) {
2108
  tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
5
54liuyao 已提交
2109
  clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
2110
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
2111
  pInfo->aggSup.currentPageId = -1;
2112
  streamStateClear(pInfo->pState);
5
54liuyao 已提交
2113 2114
}

5
54liuyao 已提交
2115 2116 2117 2118
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
  if (pBlock->info.rows <= 0) {
    return;
  }
5
54liuyao 已提交
2119 2120 2121
  blockDataCleanup(pBlock);
}

5
54liuyao 已提交
2122 2123 2124 2125 2126 2127
static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) {
  clearSpecialDataBlock(pBlock);
  int32_t size = taosArrayGetSize(array);
  if (size - (*pIndex) == 0) {
    return;
  }
L
Liu Jicong 已提交
2128
  blockDataEnsureCapacity(pBlock, size - (*pIndex));
2129 2130 2131 2132 2133
  SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
5
54liuyao 已提交
2134
  for (; (*pIndex) < size; (*pIndex)++) {
L
Liu Jicong 已提交
2135
    SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex));
2136 2137 2138 2139 2140
    colDataSetVal(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
    colDataSetVal(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
    colDataSetVal(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
    colDataSetVal(pCalStartTs, pBlock->info.rows, (const char*)&pWin->calWin.skey, false);
    colDataSetVal(pCalEndTs, pBlock->info.rows, (const char*)&pWin->calWin.ekey, false);
5
54liuyao 已提交
2141 2142 2143 2144 2145 2146 2147 2148 2149
    pBlock->info.rows++;
  }
  if ((*pIndex) == size) {
    *pIndex = 0;
    taosArrayClear(array);
  }
  blockDataUpdateTsWindow(pBlock, 0);
}

5
54liuyao 已提交
2150
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) {
5
54liuyao 已提交
2151
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
2152
  TSKEY*           tsData = (TSKEY*)pStartCol->pData;
5
54liuyao 已提交
2153 2154
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           tsEndData = (TSKEY*)pEndCol->pData;
5
54liuyao 已提交
2155
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
L
Liu Jicong 已提交
2156 2157
  uint64_t*        groupIdData = (uint64_t*)pGroupCol->pData;
  int32_t          chId = getChildIndex(pBlock);
5
54liuyao 已提交
2158
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
5
54liuyao 已提交
2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173
    TSKEY winTs = tsData[i];
    while (winTs < tsEndData[i]) {
      SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]};
      void*   chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
      if (chIds) {
        SArray* chArray = *(SArray**)chIds;
        int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
        if (index != -1) {
          qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId);
          taosArrayRemove(chArray, index);
          if (taosArrayGetSize(chArray) == 0) {
            // pull data is over
            taosArrayDestroy(chArray);
            taosHashRemove(pMap, &winRes, sizeof(SWinKey));
          }
5
54liuyao 已提交
2174 2175
        }
      }
5
54liuyao 已提交
2176
      winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
5
54liuyao 已提交
2177 2178 2179
    }
  }
}
5
54liuyao 已提交
2180

2181
static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
2182 2183
  int32_t size = taosArrayGetSize(wins);
  for (int32_t i = 0; i < size; i++) {
L
Liu Jicong 已提交
2184
    SWinKey*    winKey = taosArrayGet(wins, i);
2185
    STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval);
L
liuyao 已提交
2186 2187 2188 2189 2190 2191 2192 2193 2194
    if (isOverdue(nextWin.ekey, &pInfo->twAggSup) && pInfo->ignoreExpiredData) {
      continue;
    }
    void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
    if (!chIds) {
      SPullWindowInfo pull = {
          .window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
      // add pull data request
      if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
L
liuyao 已提交
2195 2196
        addPullWindow(pInfo->pPullDataMap, winKey, pInfo->numOfChild);
        qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, pInfo->numOfChild);
2197 2198 2199 2200 2201
      }
    }
  }
}

5
54liuyao 已提交
2202 2203 2204 2205 2206 2207
static void clearFunctionContext(SExprSupp* pSup) {
  for (int32_t i = 0; i < pSup->numOfExprs; i++) {
    pSup->pCtx[i].saveHandle.currentPage = -1;
  }
}

5
54liuyao 已提交
2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288
int32_t getOutputBuf(SStreamState* pState, SRowBuffPos* pPos, SResultRow** pResult) {
  return streamStateGetByPos(pState, pPos, (void**)pResult);
}

int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
                                   SGroupResInfo* pGroupResInfo) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SRowBuffPos* pPos = *(SRowBuffPos**)taosArrayGet(pGroupResInfo->pRows, i);
    SResultRow*  pRow = NULL;
    int32_t      code = getOutputBuf(pState, pPos, &pRow);
    uint64_t     groupId = ((SWinKey*)pPos->pKey)->groupId;
    ASSERT(code == 0);
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      continue;
    }
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = groupId;
      void* tbname = NULL;
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
        pBlock->info.parTbName[0] = 0;
      } else {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
      }
      streamFreeVal(tbname);
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.id.groupId != groupId) {
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      break;
    }
    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo;
      qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete,
             pEnryInfo->isNullRes, pEnryInfo->numOfRes);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

    pBlock->info.rows += pRow->numOfRows;
  }
  pBlock->info.dataLoad = 1;
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}

2289 2290
void doBuildStreamIntervalResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
                                 SGroupResInfo* pGroupResInfo) {
2291 2292 2293 2294 2295 2296 2297 2298 2299 2300
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
2301
  pBlock->info.id.groupId = 0;
2302
  buildDataBlockFromGroupRes(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
2303 2304
}

5
54liuyao 已提交
2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315
static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
                                           TSKEY* primaryKeys, int32_t prevPosition) {
  int32_t startPos = prevPosition + 1;
  if (startPos == pDataBlockInfo->rows) {
    startPos = -1;
  } else {
    *pNext = getFinalTimeWindow(primaryKeys[startPos], pInterval);
  }
  return startPos;
}

dengyihao's avatar
dengyihao 已提交
2316
static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) {
L
liuyao 已提交
2317
  pTaskInfo->streamInfo.dataVersion = version;
L
liuyao 已提交
2318
  pTaskInfo->streamInfo.checkPointId = ckId;
L
liuyao 已提交
2319 2320
}

2321
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
2322
                                    SSHashObj* pUpdatedMap) {
2323
  SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
L
liuyao 已提交
2324
  pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
2325 2326 2327 2328 2329 2330 2331

  SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
  SExecTaskInfo*  pTaskInfo = pOperatorInfo->pTaskInfo;
  SExprSupp*      pSup = &pOperatorInfo->exprSupp;
  int32_t         numOfOutput = pSup->numOfExprs;
  int32_t         step = 1;
  TSKEY*          tsCols = NULL;
5
54liuyao 已提交
2332
  SRowBuffPos*    pResPos = NULL;
2333 2334 2335 2336 2337 2338 2339 2340
  SResultRow*     pResult = NULL;
  int32_t         forwardRows = 0;

  SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
  tsCols = (int64_t*)pColDataInfo->pData;

  int32_t     startPos = 0;
  TSKEY       ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
5
54liuyao 已提交
2341 2342 2343 2344 2345 2346
  STimeWindow nextWin = {0};
  if (IS_FINAL_OP(pInfo)) {
    nextWin = getFinalTimeWindow(ts, &pInfo->interval);
  } else {
    nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC);
  }
2347 2348 2349 2350 2351 2352 2353 2354 2355 2356
  while (1) {
    bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
    if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
      startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
      if (startPos < 0) {
        break;
      }
      continue;
    }

L
liuyao 已提交
2357
    if (IS_FINAL_OP(pInfo) && pInfo->numOfChild > 0) {
2358 2359 2360 2361 2362 2363
      bool    ignore = true;
      SWinKey winRes = {
          .ts = nextWin.skey,
          .groupId = groupId,
      };
      void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
L
liuyao 已提交
2364
      if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && isClosed && !chIds) {
L
Liu Jicong 已提交
2365 2366
        SPullWindowInfo pull = {
            .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
2367
        // add pull data request
5
54liuyao 已提交
2368
        if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
L
liuyao 已提交
2369
          addPullWindow(pInfo->pPullDataMap, &winRes, pInfo->numOfChild);
5
54liuyao 已提交
2370
        }
2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385
      } else {
        int32_t index = -1;
        SArray* chArray = NULL;
        int32_t chId = 0;
        if (chIds) {
          chArray = *(void**)chIds;
          chId = getChildIndex(pSDataBlock);
          index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
        }
        if (index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA) {
          ignore = false;
        }
      }

      if (ignore) {
L
liuyao 已提交
2386
        startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, startPos);
2387 2388 2389 2390 2391 2392
        if (startPos < 0) {
          break;
        }
        continue;
      }
    }
L
liuyao 已提交
2393

5
54liuyao 已提交
2394
    int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput,
dengyihao's avatar
dengyihao 已提交
2395 2396
                                        pSup->rowEntryInfoOffset, &pInfo->aggSup);
    pResult = (SResultRow*)pResPos->pRowBuff;
2397
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
2398
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
2399
    }
5
54liuyao 已提交
2400 2401 2402 2403 2404 2405
    if (IS_FINAL_OP(pInfo)) {
      forwardRows = 1;
    } else {
      forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
                                             NULL, TSDB_ORDER_ASC);
    }
5
54liuyao 已提交
2406 2407

    SWinKey key = {
dengyihao's avatar
dengyihao 已提交
2408 2409
        .ts = pResult->win.skey,
        .groupId = groupId,
5
54liuyao 已提交
2410
    };
2411
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
5
54liuyao 已提交
2412
      saveWinResult(&key, pResPos, pUpdatedMap);
2413
    }
5
54liuyao 已提交
2414 2415

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
2416
      tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES);
5
54liuyao 已提交
2417
    }
dengyihao's avatar
dengyihao 已提交
2418

2419
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
H
Haojun Liao 已提交
2420
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
L
Liu Jicong 已提交
2421
                                    pSDataBlock->info.rows, numOfOutput);
5
54liuyao 已提交
2422
    key.ts = nextWin.skey;
dengyihao's avatar
dengyihao 已提交
2423

2424 2425 2426 2427
    if (pInfo->delKey.ts > key.ts) {
      pInfo->delKey = key;
    }
    int32_t prevEndPos = (forwardRows - 1) * step + startPos;
2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440
    if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
      qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
             ",maxKey %" PRId64,
             pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
      blockDataUpdateTsWindow(pSDataBlock, 0);

      // timestamp of the data is incorrect
      if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
        qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64,
               pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
      }
    }

5
54liuyao 已提交
2441 2442 2443 2444 2445 2446
    if (IS_FINAL_OP(pInfo)) {
      startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos);
    } else {
      startPos =
          getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
    }
2447 2448 2449 2450 2451 2452
    if (startPos < 0) {
      break;
    }
  }
}

5
54liuyao 已提交
2453 2454 2455
static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) {
  SRowBuffPos* pos1 = *(SRowBuffPos**)pKey1;
  SRowBuffPos* pos2 = *(SRowBuffPos**)pKey2;
dengyihao's avatar
dengyihao 已提交
2456 2457
  SWinKey*     pWin1 = (SWinKey*)pos1->pKey;
  SWinKey*     pWin2 = (SWinKey*)pos2->pKey;
5
54liuyao 已提交
2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473

  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
  }

  if (pWin1->ts > pWin2->ts) {
    return 1;
  } else if (pWin1->ts < pWin2->ts) {
    return -1;
  }

  return 0;
}

5
54liuyao 已提交
2474
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
2475
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2476
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
2477 2478
  SOperatorInfo*               downstream = pOperator->pDownstream[0];
  SExprSupp*                   pSup = &pOperator->exprSupp;
2479

5
54liuyao 已提交
2480
  qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
L
Liu Jicong 已提交
2481

5
54liuyao 已提交
2482 2483 2484
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  } else if (pOperator->status == OP_RES_TO_RETURN) {
5
54liuyao 已提交
2485 2486 2487
    doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
    if (pInfo->pPullDataRes->info.rows != 0) {
      // process the rest of the data
5
54liuyao 已提交
2488
      printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2489 2490 2491
      return pInfo->pPullDataRes;
    }

2492
    doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
2493 2494 2495 2496 2497 2498
    if (pInfo->pDelRes->info.rows != 0) {
      // process the rest of the data
      printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
      return pInfo->pDelRes;
    }

2499
    doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2500 2501 2502
    if (pInfo->binfo.pRes->info.rows != 0) {
      printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
      return pInfo->binfo.pRes;
5
54liuyao 已提交
2503
    }
5
54liuyao 已提交
2504

H
Haojun Liao 已提交
2505
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
2506 2507 2508 2509
    if (!IS_FINAL_OP(pInfo)) {
      clearFunctionContext(&pOperator->exprSupp);
      // semi interval operator clear disk buffer
      clearStreamIntervalOperator(pInfo);
L
liuyao 已提交
2510
      setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
5
54liuyao 已提交
2511 2512
      qDebug("===stream===clear semi operator");
    } else {
L
liuyao 已提交
2513 2514
      if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
        streamStateCommit(pInfo->pState);
L
liuyao 已提交
2515
        streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
L
liuyao 已提交
2516 2517
        pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
      }
L
liuyao 已提交
2518
      qDebug("===stream===interval final close");
5
54liuyao 已提交
2519 2520
    }
    return NULL;
5
54liuyao 已提交
2521
  } else {
5
54liuyao 已提交
2522
    if (!IS_FINAL_OP(pInfo)) {
2523
      doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
2524 2525 2526 2527 2528
      if (pInfo->pDelRes->info.rows != 0) {
        // process the rest of the data
        printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
        return pInfo->pDelRes;
      }
5
54liuyao 已提交
2529
    }
5
54liuyao 已提交
2530 2531
  }

5
54liuyao 已提交
2532
  if (!pInfo->pUpdated) {
L
liuyao 已提交
2533
    pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
5
54liuyao 已提交
2534 2535 2536
  }
  if (!pInfo->pUpdatedMap) {
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
L
liuyao 已提交
2537
    pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
5
54liuyao 已提交
2538 2539
  }

5
54liuyao 已提交
2540 2541 2542
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
2543
      pOperator->status = OP_RES_TO_RETURN;
L
Liu Jicong 已提交
2544 2545
      qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
             IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
5
54liuyao 已提交
2546
      pInfo->numOfDatapack = 0;
5
54liuyao 已提交
2547 2548
      break;
    }
5
54liuyao 已提交
2549
    pInfo->numOfDatapack++;
5
54liuyao 已提交
2550
    printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
2551

H
Haojun Liao 已提交
2552
    if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
5
54liuyao 已提交
2553
      pInfo->binfo.pRes->info.type = pBlock->info.type;
2554 2555
    } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
               pBlock->info.type == STREAM_CLEAR) {
2556
      SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
5
54liuyao 已提交
2557
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
2558
      if (IS_FINAL_OP(pInfo)) {
2559 2560 2561
        addRetriveWindow(delWins, pInfo);
        taosArrayAddAll(pInfo->pDelWins, delWins);
        taosArrayDestroy(delWins);
2562 2563
        continue;
      }
5
54liuyao 已提交
2564
      removeResults(delWins, pInfo->pUpdatedMap);
2565 2566
      taosArrayAddAll(pInfo->pDelWins, delWins);
      taosArrayDestroy(delWins);
L
liuyao 已提交
2567 2568 2569 2570 2571 2572 2573 2574

      doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
      if (pInfo->pDelRes->info.rows != 0) {
        // process the rest of the data
        printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
        return pInfo->pDelRes;
      }

2575
      break;
5
54liuyao 已提交
2576
    } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
5
54liuyao 已提交
2577
      getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
5
54liuyao 已提交
2578
      continue;
5
54liuyao 已提交
2579
    } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
5
54liuyao 已提交
2580 2581
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
      if (taosArrayGetSize(pInfo->pUpdated) > 0) {
5
54liuyao 已提交
2582 2583 2584
        break;
      }
      continue;
L
Liu Jicong 已提交
2585
    } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
5
54liuyao 已提交
2586
      processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
5
54liuyao 已提交
2587
      continue;
5
54liuyao 已提交
2588 2589 2590 2591
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
      return pBlock;
    } else {
      ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
5
54liuyao 已提交
2592
    }
5
54liuyao 已提交
2593

5
54liuyao 已提交
2594 2595 2596 2597
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
2598
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
2599 2600 2601 2602
    doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
    pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
5
54liuyao 已提交
2603
  }
S
shenglian zhou 已提交
2604

5
54liuyao 已提交
2605
  removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
5
54liuyao 已提交
2606
  if (IS_FINAL_OP(pInfo)) {
2607
    closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
5
54liuyao 已提交
2608
                              pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
5
54liuyao 已提交
2609
  }
2610
  pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
5
54liuyao 已提交
2611

2612 2613 2614
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
5
54liuyao 已提交
2615
    taosArrayPush(pInfo->pUpdated, pIte);
5
54liuyao 已提交
2616
  }
2617
  tSimpleHashCleanup(pInfo->pUpdatedMap);
5
54liuyao 已提交
2618
  pInfo->pUpdatedMap = NULL;
5
54liuyao 已提交
2619
  taosArraySort(pInfo->pUpdated, winPosCmprImpl);
5
54liuyao 已提交
2620

5
54liuyao 已提交
2621 2622
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
  pInfo->pUpdated = NULL;
5
54liuyao 已提交
2623
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
2624 2625 2626 2627

  doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
  if (pInfo->pPullDataRes->info.rows != 0) {
    // process the rest of the data
5
54liuyao 已提交
2628
    printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2629 2630 2631
    return pInfo->pPullDataRes;
  }

2632
  doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
5
54liuyao 已提交
2633 2634 2635 2636 2637 2638
  if (pInfo->pDelRes->info.rows != 0) {
    // process the rest of the data
    printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
    return pInfo->pDelRes;
  }

2639
  doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
2640
  if (pInfo->binfo.pRes->info.rows != 0) {
5
54liuyao 已提交
2641
    printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
5
54liuyao 已提交
2642 2643 2644 2645 2646 2647
    return pInfo->binfo.pRes;
  }

  return NULL;
}

5
54liuyao 已提交
2648 2649 2650 2651
int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
  if (pIntervalPhyNode->window.deleteMark <= 0) {
    return DEAULT_DELETE_MARK;
  }
L
Liu Jicong 已提交
2652
  int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark, pIntervalPhyNode->window.watermark);
5
54liuyao 已提交
2653 2654 2655 2656
  deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval);
  return deleteMark;
}

5
54liuyao 已提交
2657
TSKEY compareTs(void* pKey) {
dengyihao's avatar
dengyihao 已提交
2658
  SWinKey* pWinKey = (SWinKey*)pKey;
5
54liuyao 已提交
2659 2660 2661
  return pWinKey->ts;
}

S
shenglian zhou 已提交
2662 2663
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                     SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
2664 2665 2666
  SIntervalPhysiNode*          pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
  SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5
54liuyao 已提交
2667 2668 2669
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2670

2671
  pOperator->pTaskInfo = pTaskInfo;
S
shenglian zhou 已提交
2672 2673 2674 2675 2676 2677 2678 2679
  pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
                                .sliding = pIntervalPhyNode->sliding,
                                .intervalUnit = pIntervalPhyNode->intervalUnit,
                                .slidingUnit = pIntervalPhyNode->slidingUnit,
                                .offset = pIntervalPhyNode->offset,
                                .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pIntervalPhyNode->window.watermark,
5
54liuyao 已提交
2680 2681
      .calTrigger = pIntervalPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
2682
      .minTs = INT64_MAX,
5
54liuyao 已提交
2683
      .deleteMark = getDeleteMark(pIntervalPhyNode),
L
Liu Jicong 已提交
2684 2685
      .deleteMarkSaved = 0,
      .calTriggerSaved = 0,
L
liuyao 已提交
2686
      .checkPointTs = 0,
dengyihao's avatar
dengyihao 已提交
2687 2688
      .checkPointInterval =
          convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision),
S
shenglian zhou 已提交
2689
  };
5
54liuyao 已提交
2690
  ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
5
54liuyao 已提交
2691 2692
  pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2693
  initResultSizeInfo(&pOperator->resultInfo, 4096);
5
54liuyao 已提交
2694 2695 2696 2697 2698 2699 2700 2701 2702
  if (pIntervalPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
    int32_t    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

S
shenglian zhou 已提交
2703 2704
  int32_t      numOfCols = 0;
  SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
2705
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
5
54liuyao 已提交
2706
  initBasicInfo(&pInfo->binfo, pResBlock);
2707

L
Liu Jicong 已提交
2708 2709
  int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
                            pTaskInfo->streamInfo.pState);
2710 2711 2712 2713
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2714
  initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
2715

5
54liuyao 已提交
2716
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2717

2718 2719 2720 2721
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
  streamStateSetNumber(pInfo->pState, -1);

2722
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
L
liuyao 已提交
2723
  pInfo->numOfChild = numOfChild;
5
54liuyao 已提交
2724

2725
  pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode);
5
54liuyao 已提交
2726

5
54liuyao 已提交
2727 2728 2729 2730
  if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
    pInfo->isFinal = true;
    pOperator->name = "StreamFinalIntervalOperator";
  } else {
5
54liuyao 已提交
2731
    // semi interval operator does not catch result
5
54liuyao 已提交
2732 2733 2734 2735
    pInfo->isFinal = false;
    pOperator->name = "StreamSemiIntervalOperator";
  }

5
54liuyao 已提交
2736
  if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
5
54liuyao 已提交
2737 2738
    pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
  }
5
54liuyao 已提交
2739 2740 2741 2742
  pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
  pInfo->pullIndex = 0;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
2743
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
5
54liuyao 已提交
2744
  pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
2745
  pInfo->ignoreExpiredDataSaved = false;
2746
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
2747
  pInfo->delIndex = 0;
H
Haojun Liao 已提交
2748
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
2749 2750
  pInfo->delKey.ts = INT64_MAX;
  pInfo->delKey.groupId = 0;
5
54liuyao 已提交
2751
  pInfo->numOfDatapack = 0;
5
54liuyao 已提交
2752 2753
  pInfo->pUpdated = NULL;
  pInfo->pUpdatedMap = NULL;
dengyihao's avatar
dengyihao 已提交
2754 2755
  pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize,
                                                  compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
L
liuyao 已提交
2756
  pInfo->dataVersion = 0;
5
54liuyao 已提交
2757

5
54liuyao 已提交
2758
  pOperator->operatorType = pPhyNode->type;
5
54liuyao 已提交
2759 2760 2761 2762
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;

L
Liu Jicong 已提交
2763 2764
  pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo,
                                         optrDefaultBufFn, NULL);
2765
  if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
L
liuyao 已提交
2766
    initIntervalDownStream(downstream, pPhyNode->type, pInfo);
2767
  }
5
54liuyao 已提交
2768 2769 2770 2771 2772 2773 2774 2775
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
2776
  destroyStreamFinalIntervalOperatorInfo(pInfo);
5
54liuyao 已提交
2777 2778 2779
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
5
54liuyao 已提交
2780
}
5
54liuyao 已提交
2781 2782

void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
5
54liuyao 已提交
2783
  tSimpleHashCleanup(pSup->pResultRows);
5
54liuyao 已提交
2784 2785
  destroyDiskbasedBuf(pSup->pResultBuf);
  blockDataDestroy(pSup->pScanBlock);
5
54liuyao 已提交
2786 2787
  taosMemoryFreeClear(pSup->pState);
  taosMemoryFreeClear(pSup->pDummyCtx);
5
54liuyao 已提交
2788 2789
}

2790
void destroyStreamSessionAggOperatorInfo(void* param) {
5
54liuyao 已提交
2791
  SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
2792
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
2793
  destroyStreamAggSupporter(&pInfo->streamAggSup);
5
54liuyao 已提交
2794

2795 2796 2797
  if (pInfo->pChildren != NULL) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
2798 2799
      SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
      destroyOperatorInfo(pChild);
2800
    }
5
54liuyao 已提交
2801
    taosArrayDestroy(pInfo->pChildren);
2802
  }
5
54liuyao 已提交
2803 2804 2805 2806
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  blockDataDestroy(pInfo->pDelRes);
  blockDataDestroy(pInfo->pWinBlock);
  blockDataDestroy(pInfo->pUpdateRes);
5
54liuyao 已提交
2807
  tSimpleHashCleanup(pInfo->pStDeleted);
2808

D
dapan1121 已提交
2809
  taosMemoryFreeClear(param);
5
54liuyao 已提交
2810 2811
}

2812 2813
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
                        SSDataBlock* pResultBlock) {
H
Haojun Liao 已提交
2814
  initBasicInfo(pBasicInfo, pResultBlock);
2815 2816 2817 2818
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
2819

H
Haojun Liao 已提交
2820
  initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
5
54liuyao 已提交
2821
  for (int32_t i = 0; i < numOfCols; ++i) {
2822
    pSup->pCtx[i].saveHandle.pBuf = NULL;
5
54liuyao 已提交
2823
  }
2824

2825
  ASSERT(numOfCols > 0);
5
54liuyao 已提交
2826 2827 2828 2829 2830 2831
  return TSDB_CODE_SUCCESS;
}

void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t nums) {
  for (int i = 0; i < nums; i++) {
    pDummy[i].functionId = pCtx[i].functionId;
2832 2833
    pDummy[i].isNotNullFunc = pCtx[i].isNotNullFunc;
    pDummy[i].isPseudoFunc = pCtx[i].isPseudoFunc;
5
54liuyao 已提交
2834 2835
  }
}
5
54liuyao 已提交
2836

5
54liuyao 已提交
2837 2838
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
                    STimeWindowAggSupp* pTwSup) {
2839 2840 2841 2842 2843 2844
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
    SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
    pScanInfo->tsColIndex = tsColIndex;
  }

  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
5
54liuyao 已提交
2845
    initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup);
2846 2847
    return;
  }
2848
  SStreamScanInfo* pScanInfo = downstream->info;
5
54liuyao 已提交
2849
  pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
5
54liuyao 已提交
2850
  if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
5
54liuyao 已提交
2851
    pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
5
54liuyao 已提交
2852
  }
5
54liuyao 已提交
2853
  pScanInfo->twAggSup = *pTwSup;
5
54liuyao 已提交
2854 2855
}

5
54liuyao 已提交
2856 2857 2858 2859 2860 2861 2862 2863 2864 2865
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
                               SStreamState* pState, int32_t keySize, int16_t keyType) {
  pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
  pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
  pSup->gap = gap;
  pSup->stateKeySize = keySize;
  pSup->stateKeyType = keyType;
  pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
  if (pSup->pDummyCtx == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
2866
  }
H
Haojun Liao 已提交
2867

5
54liuyao 已提交
2868 2869 2870 2871
  initDummyFunction(pSup->pDummyCtx, pCtx, numOfOutput);
  pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pSup->pState) = *pState;
  streamStateSetNumber(pSup->pState, -1);
2872

5
54liuyao 已提交
2873 2874
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = tSimpleHashInit(32, hashFn);
X
Xiaoyu Wang 已提交
2875

5
54liuyao 已提交
2876 2877 2878
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
5
54liuyao 已提交
2879
  }
5
54liuyao 已提交
2880 2881 2882 2883
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
5
54liuyao 已提交
2884
  }
5
54liuyao 已提交
2885 2886 2887 2888
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s", terrstr(terrno));
    return terrno;
5
54liuyao 已提交
2889
  }
5
54liuyao 已提交
2890 2891 2892
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
  for (int32_t i = 0; i < numOfOutput; ++i) {
    pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
5
54liuyao 已提交
2893 2894
  }

5
54liuyao 已提交
2895
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
2896
}
5
54liuyao 已提交
2897 2898

bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
5
54liuyao 已提交
2899
  if (ts + gap >= pWin->skey && ts - gap <= pWin->ekey) {
5
54liuyao 已提交
2900 2901 2902 2903 2904
    return true;
  }
  return false;
}

5
54liuyao 已提交
2905 2906
bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) {
  return isInTimeWindow(&pWinInfo->sessionWin.win, ts, gap);
5
54liuyao 已提交
2907 2908
}

5
54liuyao 已提交
2909 2910 2911 2912 2913
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
                         SSessionKey* pKey) {
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
2914
  int32_t code = streamStateSessionGetKeyByRange(pAggSup->pState, pKey, pKey);
5
54liuyao 已提交
2915 2916
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
2917 2918 2919
  }
}

5
54liuyao 已提交
2920
bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->sessionWin.win.skey == 0; }
5
54liuyao 已提交
2921

5
54liuyao 已提交
2922 2923 2924
void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
                         SResultWindowInfo* pCurWin) {
  pCurWin->sessionWin.groupId = groupId;
2925 2926
  pCurWin->sessionWin.win.skey = startTs;
  pCurWin->sessionWin.win.ekey = endTs;
5
54liuyao 已提交
2927
  int32_t size = pAggSup->resultRowSize;
2928 2929
  int32_t code =
      streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, pAggSup->gap, &pCurWin->pOutputBuf, &size);
5
54liuyao 已提交
2930 2931 2932 2933 2934
  if (code == TSDB_CODE_SUCCESS) {
    pCurWin->isOutput = true;
  } else {
    pCurWin->sessionWin.win.skey = startTs;
    pCurWin->sessionWin.win.ekey = endTs;
5
54liuyao 已提交
2935
  }
5
54liuyao 已提交
2936
}
5
54liuyao 已提交
2937

5
54liuyao 已提交
2938 2939
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
  int32_t size = 0;
2940
  int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
5
54liuyao 已提交
2941 2942
  if (code != TSDB_CODE_SUCCESS) {
    return code;
5
54liuyao 已提交
2943
  }
5
54liuyao 已提交
2944 2945 2946 2947 2948 2949
  streamStateCurNext(pAggSup->pState, pCur);
  return TSDB_CODE_SUCCESS;
}
void saveDeleteInfo(SArray* pWins, SSessionKey key) {
  // key.win.ekey = key.win.skey;
  taosArrayPush(pWins, &key);
5
54liuyao 已提交
2950 2951
}

5
54liuyao 已提交
2952 2953 2954 2955
void saveDeleteRes(SSHashObj* pStDelete, SSessionKey key) {
  key.win.ekey = key.win.skey;
  tSimpleHashPut(pStDelete, &key, sizeof(SSessionKey), NULL, 0);
}
2956

5
54liuyao 已提交
2957 2958 2959 2960 2961
static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) {
  key.win.ekey = key.win.skey;
  tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
  tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
}
5
54liuyao 已提交
2962

5
54liuyao 已提交
2963 2964 2965 2966 2967
static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
  *pHashKey = *pKey;
  pHashKey->win.ekey = pKey->win.skey;
}

5
54liuyao 已提交
2968 2969 2970
static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
  if (tSimpleHashGetSize(pHashMap) == 0) {
    return;
5
54liuyao 已提交
2971
  }
5
54liuyao 已提交
2972 2973 2974 2975
  int32_t size = taosArrayGetSize(pWins);
  for (int32_t i = 0; i < size; i++) {
    SSessionKey* pWin = taosArrayGet(pWins, i);
    if (!pWin) continue;
5
54liuyao 已提交
2976 2977
    SSessionKey key = {0};
    getSessionHashKey(pWin, &key);
5
54liuyao 已提交
2978
    tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
5
54liuyao 已提交
2979 2980 2981
  }
}

dengyihao's avatar
dengyihao 已提交
2982
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,
5
54liuyao 已提交
2983 2984
                                int32_t rows, int32_t start, int64_t gap, SSHashObj* pResultRows, SSHashObj* pStUpdated,
                                SSHashObj* pStDeleted) {
5
54liuyao 已提交
2985
  for (int32_t i = start; i < rows; ++i) {
2986
    if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
5
54liuyao 已提交
2987 2988
      return i - start;
    }
5
54liuyao 已提交
2989
    if (pWinInfo->sessionWin.win.skey > pStartTs[i]) {
5
54liuyao 已提交
2990
      if (pStDeleted && pWinInfo->isOutput) {
5
54liuyao 已提交
2991
        saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
5
54liuyao 已提交
2992
      }
5
54liuyao 已提交
2993 2994
      removeSessionResult(pStUpdated, pResultRows, pWinInfo->sessionWin);
      pWinInfo->sessionWin.win.skey = pStartTs[i];
5
54liuyao 已提交
2995
    }
5
54liuyao 已提交
2996
    pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]);
5
54liuyao 已提交
2997
    if (pEndTs) {
5
54liuyao 已提交
2998
      pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pEndTs[i]);
5
54liuyao 已提交
2999 3000 3001 3002 3003
    }
  }
  return rows - start;
}

5
54liuyao 已提交
3004 3005
static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
                                    int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
3006
  ASSERT(pWinInfo->sessionWin.win.skey <= pWinInfo->sessionWin.win.ekey);
5
54liuyao 已提交
3007
  *pResult = (SResultRow*)pWinInfo->pOutputBuf;
5
54liuyao 已提交
3008
  // set time window for current result
5
54liuyao 已提交
3009
  (*pResult)->win = pWinInfo->sessionWin.win;
3010
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
5
54liuyao 已提交
3011 3012 3013
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3014 3015 3016
static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
                                  int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
                                  SOperatorInfo* pOperator) {
3017
  SExprSupp*     pSup = &pOperator->exprSupp;
3018
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3019
  int32_t        code = initSessionOutputBuf(pCurWin, pResult, pSup->pCtx, numOutput, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
3020
  if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
S
Shengliang Guan 已提交
3021
    return TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
3022
  }
5
54liuyao 已提交
3023
  updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, false);
H
Haojun Liao 已提交
3024
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput);
5
54liuyao 已提交
3025 3026 3027
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3028 3029
static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
  streamStateSessionDel(pAggSup->pState, pKey);
5
54liuyao 已提交
3030 3031 3032
  SSessionKey hashKey = {0};
  getSessionHashKey(pKey, &hashKey);
  tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
5
54liuyao 已提交
3033 3034 3035 3036 3037 3038 3039 3040 3041 3042
  return true;
}

static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo) {
  void* pVal = tSimpleHashGet(pStUpdated, &pWinInfo->sessionWin, sizeof(SSessionKey));
  if (pVal) {
    SResultWindowInfo* pWin = pVal;
    pWinInfo->isOutput = pWin->isOutput;
  }
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3043 3044
}

5
54liuyao 已提交
3045 3046 3047 3048 3049 3050 3051
SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
                                       SResultWindowInfo* pNextWin) {
  SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->sessionWin);
  pNextWin->isOutput = true;
  setSessionWinOutputInfo(pStUpdated, pNextWin);
  int32_t size = 0;
  pNextWin->sessionWin = pCurWin->sessionWin;
3052
  int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
5
54liuyao 已提交
3053 3054 3055 3056
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_INVALID(*pNextWin);
  }
  return pCur;
5
54liuyao 已提交
3057 3058
}

5
54liuyao 已提交
3059 3060 3061 3062 3063 3064 3065 3066 3067
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
                                 SSHashObj* pStDeleted) {
  SExprSupp*                     pSup = &pOperator->exprSupp;
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SResultRow*                    pCurResult = NULL;
  int32_t                        numOfOutput = pOperator->exprSupp.numOfExprs;
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
  initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
5
54liuyao 已提交
3068
  // Just look for the window behind StartIndex
5
54liuyao 已提交
3069 3070 3071 3072 3073 3074
  while (1) {
    SResultWindowInfo winInfo = {0};
    SStreamStateCur*  pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo);
    if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap)) {
      streamStateFreeCur(pCur);
      break;
5
54liuyao 已提交
3075
    }
5
54liuyao 已提交
3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087
    SResultRow* pWinResult = NULL;
    initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
    pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true);
    compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
    tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey));
    if (winInfo.isOutput && pStDeleted) {
      saveDeleteRes(pStDeleted, winInfo.sessionWin);
    }
    removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin);
    doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
    streamStateFreeCur(pCur);
5
54liuyao 已提交
3088 3089 3090
  }
}

5
54liuyao 已提交
3091 3092 3093
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
  saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize);
  return TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3094 3095
}

5
54liuyao 已提交
3096 3097
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
                                   SSHashObj* pStDeleted, bool hasEndTs) {
X
Xiaoyu Wang 已提交
3098
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3099
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
3100
  int32_t                        numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
3101
  uint64_t                       groupId = pSDataBlock->info.id.groupId;
X
Xiaoyu Wang 已提交
3102
  int64_t                        code = TSDB_CODE_SUCCESS;
5
54liuyao 已提交
3103 3104 3105
  SResultRow*                    pResult = NULL;
  int32_t                        rows = pSDataBlock->info.rows;
  int32_t                        winRows = 0;
X
Xiaoyu Wang 已提交
3106

L
liuyao 已提交
3107 3108
  pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);

5
54liuyao 已提交
3109
  SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
3110
  TSKEY*           startTsCols = (int64_t*)pStartTsCol->pData;
5
54liuyao 已提交
3111 3112 3113
  SColumnInfoData* pEndTsCol = NULL;
  if (hasEndTs) {
    pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->endTsIndex);
5
54liuyao 已提交
3114
  } else {
5
54liuyao 已提交
3115
    pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
5
54liuyao 已提交
3116
  }
X
Xiaoyu Wang 已提交
3117

5
54liuyao 已提交
3118
  TSKEY*               endTsCols = (int64_t*)pEndTsCol->pData;
5
54liuyao 已提交
3119
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3120
  for (int32_t i = 0; i < rows;) {
5
54liuyao 已提交
3121
    if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
5
54liuyao 已提交
3122 3123 3124
      i++;
      continue;
    }
5
54liuyao 已提交
3125 3126 3127 3128 3129
    SResultWindowInfo winInfo = {0};
    setSessionOutputBuf(pAggSup, startTsCols[i], endTsCols[i], groupId, &winInfo);
    setSessionWinOutputInfo(pStUpdated, &winInfo);
    winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
                                      pAggSup->pResultRows, pStUpdated, pStDeleted);
5
54liuyao 已提交
3130 3131
    // coverity scan error
    if (!winInfo.pOutputBuf) {
S
Shengliang Guan 已提交
3132
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3133
    }
L
Liu Jicong 已提交
3134

5
54liuyao 已提交
3135 3136
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
                              pOperator);
5
54liuyao 已提交
3137
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
3138
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3139
    }
5
54liuyao 已提交
3140 3141
    compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted);
    saveSessionOutputBuf(pAggSup, &winInfo);
5
54liuyao 已提交
3142 3143

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
5
54liuyao 已提交
3144
      code = saveResult(winInfo, pStUpdated);
5
54liuyao 已提交
3145
      if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
3146
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3147
      }
5
54liuyao 已提交
3148
    }
5
54liuyao 已提交
3149
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
3150 3151
      SSessionKey key = {0};
      getSessionHashKey(&winInfo.sessionWin, &key);
5
54liuyao 已提交
3152 3153 3154
      tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
    }

5
54liuyao 已提交
3155 3156 3157 3158
    i += winRows;
  }
}

5
54liuyao 已提交
3159
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
5
54liuyao 已提交
3160
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
3161
  TSKEY*           startDatas = (TSKEY*)pStartTsCol->pData;
5
54liuyao 已提交
3162
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
3163
  TSKEY*           endDatas = (TSKEY*)pEndTsCol->pData;
3164
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
3165
  uint64_t*        gpDatas = (uint64_t*)pGroupCol->pData;
5
54liuyao 已提交
3166
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
5
54liuyao 已提交
3167 3168 3169 3170
    while (1) {
      SSessionKey curWin = {0};
      getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], &curWin);
      if (IS_INVALID_SESSION_WIN_KEY(curWin)) {
3171 3172
        break;
      }
5
54liuyao 已提交
3173 3174 3175 3176
      doDeleteSessionWindow(pAggSup, &curWin);
      if (result) {
        saveDeleteInfo(result, curWin);
      }
3177
    }
5
54liuyao 已提交
3178 3179 3180
  }
}

5
54liuyao 已提交
3181 3182 3183 3184 3185 3186 3187 3188
static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
  SSessionKey* pWin1 = (SSessionKey*)pKey1;
  SSessionKey* pWin2 = (SSessionKey*)pKey2;

  if (pWin1->groupId > pWin2->groupId) {
    return 1;
  } else if (pWin1->groupId < pWin2->groupId) {
    return -1;
5
54liuyao 已提交
3189 3190
  }

5
54liuyao 已提交
3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203
  if (pWin1->win.skey > pWin2->win.skey) {
    return 1;
  } else if (pWin1->win.skey < pWin2->win.skey) {
    return -1;
  }

  return 0;
}

static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
3204
    void* key = tSimpleHashGetKey(pIte, NULL);
5
54liuyao 已提交
3205 3206 3207 3208 3209 3210
    taosArrayPush(pUpdated, key);
  }
  taosArraySort(pUpdated, sessionKeyCompareAsc);
  return TSDB_CODE_SUCCESS;
}

3211
void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
5
54liuyao 已提交
3212 3213 3214 3215
  blockDataCleanup(pBlock);
  int32_t size = tSimpleHashGetSize(pStDeleted);
  if (size == 0) {
    return;
3216 3217
  }
  blockDataEnsureCapacity(pBlock, size);
5
54liuyao 已提交
3218 3219 3220 3221 3222
  int32_t iter = 0;
  while (((*Ite) = tSimpleHashIterate(pStDeleted, *Ite, &iter)) != NULL) {
    if (pBlock->info.rows + 1 > pBlock->info.capacity) {
      break;
    }
3223
    SSessionKey*     res = tSimpleHashGetKey(*Ite, NULL);
3224
    SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
3225
    colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
3226
    SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
3227
    colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)&res->win.skey, false);
3228
    SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
3229
    colDataSetNULL(pUidCol, pBlock->info.rows);
5
54liuyao 已提交
3230
    SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
3231
    colDataSetVal(pGpCol, pBlock->info.rows, (const char*)&res->groupId, false);
3232
    SColumnInfoData* pCalStCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
3233
    colDataSetNULL(pCalStCol, pBlock->info.rows);
3234
    SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
3235
    colDataSetNULL(pCalEdCol, pBlock->info.rows);
3236 3237

    SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
3238 3239 3240

    void* tbname = NULL;
    streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname);
3241
    if (tbname == NULL) {
3242
      colDataSetNULL(pTableCol, pBlock->info.rows);
3243 3244 3245
    } else {
      char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
      STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
3246
      colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
dengyihao's avatar
dengyihao 已提交
3247
      streamFreeVal(tbname);
3248
    }
5
54liuyao 已提交
3249 3250 3251
    pBlock->info.rows += 1;
  }
  if ((*Ite) == NULL) {
5
54liuyao 已提交
3252
    tSimpleHashClear(pStDeleted);
5
54liuyao 已提交
3253 3254 3255
  }
}

5
54liuyao 已提交
3256 3257 3258 3259 3260 3261 3262
static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) {
  SExprSupp*                     pSup = &pOperator->exprSupp;
  SExecTaskInfo*                 pTaskInfo = pOperator->pTaskInfo;
  int32_t                        size = taosArrayGetSize(pWinArray);
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
  int32_t                        numOfOutput = pSup->numOfExprs;
L
liuyao 已提交
3263
  int32_t                        numOfChild = taosArrayGetSize(pInfo->pChildren);
3264

3265
  for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3266 3267 3268
    SSessionKey*      pWinKey = taosArrayGet(pWinArray, i);
    int32_t           num = 0;
    SResultWindowInfo parentWin = {0};
L
liuyao 已提交
3269
    for (int32_t j = 0; j < numOfChild; j++) {
X
Xiaoyu Wang 已提交
3270
      SOperatorInfo*                 pChild = taosArrayGetP(pInfo->pChildren, j);
3271
      SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
5
54liuyao 已提交
3272
      SStreamAggSupporter*           pChAggSup = &pChInfo->streamAggSup;
5
54liuyao 已提交
3273 3274
      SSessionKey                    chWinKey = {0};
      getSessionHashKey(pWinKey, &chWinKey);
3275 3276 3277
      SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
      SResultRow*      pResult = NULL;
      SResultRow*      pChResult = NULL;
5
54liuyao 已提交
3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289
      while (1) {
        SResultWindowInfo childWin = {0};
        childWin.sessionWin = *pWinKey;
        int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
        if (code == TSDB_CODE_SUCCESS && pWinKey->win.skey <= childWin.sessionWin.win.skey &&
            childWin.sessionWin.win.ekey <= pWinKey->win.ekey) {
          if (num == 0) {
            setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
            code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
            if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
              break;
            }
3290
          }
5
54liuyao 已提交
3291 3292 3293 3294 3295 3296 3297 3298
          num++;
          updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, true);
          initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
                               pChild->exprSupp.rowEntryInfoOffset);
          compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
          compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL);
          saveResult(parentWin, pStUpdated);
        } else {
5
54liuyao 已提交
3299
          break;
3300 3301
        }
      }
5
54liuyao 已提交
3302 3303 3304 3305
      streamStateFreeCur(pCur);
    }
    if (num > 0) {
      saveSessionOutputBuf(pAggSup, &parentWin);
3306 3307 3308 3309
    }
  }
}

5
54liuyao 已提交
3310 3311 3312 3313 3314 3315 3316 3317 3318 3319
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) {
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
    SResultWindowInfo* pWinInfo = pIte;
    if (isCloseWindow(&pWinInfo->sessionWin.win, pTwSup)) {
      if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) {
        int32_t code = saveResult(*pWinInfo, pClosed);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
5
54liuyao 已提交
3320 3321
        }
      }
3322
      SSessionKey* pKey = tSimpleHashGetKey(pIte, NULL);
3323
      tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
5
54liuyao 已提交
3324 3325 3326 3327 3328
    }
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3329
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs) {
5
54liuyao 已提交
3330 3331 3332 3333 3334
  int32_t size = taosArrayGetSize(pChildren);
  for (int32_t i = 0; i < size; i++) {
    SOperatorInfo*                 pChildOp = taosArrayGetP(pChildren, i);
    SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
    pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
5
54liuyao 已提交
3335
    closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL);
5
54liuyao 已提交
3336 3337 3338
  }
}

5
54liuyao 已提交
3339 3340 3341 3342
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
3343
    SResultWindowInfo* pWinInfo = pIte;
5
54liuyao 已提交
3344
    saveResult(*pWinInfo, pStUpdated);
5
54liuyao 已提交
3345 3346 3347 3348
  }
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
3349
static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
5
54liuyao 已提交
3350 3351
  int32_t size = taosArrayGetSize(pResWins);
  for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3352 3353
    SSessionKey* pWinKey = taosArrayGet(pResWins, i);
    if (!pWinKey) continue;
5
54liuyao 已提交
3354 3355
    SSessionKey winInfo = {0};
    getSessionHashKey(pWinKey, &winInfo);
5
54liuyao 已提交
3356
    tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0);
3357 3358 3359
  }
}

H
Haojun Liao 已提交
3360
// the allocated memory comes from outer function.
5
54liuyao 已提交
3361 3362 3363
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
  pGroupResInfo->pRows = pArrayList;
  pGroupResInfo->index = 0;
H
Haojun Liao 已提交
3364
  pGroupResInfo->pBuf = NULL;
3365 3366
}

5
54liuyao 已提交
3367 3368 3369 3370 3371 3372 3373 3374
void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroupResInfo* pGroupResInfo,
                          SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
H
Haojun Liao 已提交
3375
    cleanupGroupResInfo(pGroupResInfo);
3376 3377 3378
    return;
  }

5
54liuyao 已提交
3379
  // clear the existed group id
H
Haojun Liao 已提交
3380
  pBlock->info.id.groupId = 0;
3381
  buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
5
54liuyao 已提交
3382 3383
}

5
54liuyao 已提交
3384
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
5
54liuyao 已提交
3385
  SExprSupp*                     pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3386
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
3387
  SOptrBasicInfo*                pBInfo = &pInfo->binfo;
5
54liuyao 已提交
3388
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3389 3390 3391
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  } else if (pOperator->status == OP_RES_TO_RETURN) {
3392
    doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3393
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3394
      printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
5
54liuyao 已提交
3395 3396
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3397 3398 3399 3400
    doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
    if (pBInfo->pRes->info.rows > 0) {
      printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
      return pBInfo->pRes;
5
54liuyao 已提交
3401
    }
5
54liuyao 已提交
3402

H
Haojun Liao 已提交
3403
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
3404
    return NULL;
5
54liuyao 已提交
3405 3406 3407
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3408 3409 3410 3411
  if (!pInfo->pUpdated) {
    pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey));
  }
  if (!pInfo->pStUpdated) {
3412
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3413 3414
    pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
  }
5
54liuyao 已提交
3415 3416 3417 3418 3419
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      break;
    }
5
54liuyao 已提交
3420
    printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
3421

5
54liuyao 已提交
3422 3423 3424
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3425
      // gap must be 0
5
54liuyao 已提交
3426
      doDeleteTimeWindows(pAggSup, pBlock, pWins);
5
54liuyao 已提交
3427
      removeSessionResults(pInfo->pStUpdated, pWins);
5
54liuyao 已提交
3428 3429 3430 3431 3432
      if (IS_FINAL_OP(pInfo)) {
        int32_t                        childIndex = getChildIndex(pBlock);
        SOperatorInfo*                 pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
        SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
        // gap must be 0
5
54liuyao 已提交
3433
        doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, NULL);
5
54liuyao 已提交
3434
        rebuildSessionWindow(pOperator, pWins, pInfo->pStUpdated);
5
54liuyao 已提交
3435 3436 3437 3438
      }
      copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
      taosArrayDestroy(pWins);
      continue;
3439
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3440
      getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
5
54liuyao 已提交
3441
      continue;
5
54liuyao 已提交
3442 3443 3444 3445
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
      return pBlock;
    } else {
      ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
5
54liuyao 已提交
3446
    }
5
54liuyao 已提交
3447

5
54liuyao 已提交
3448 3449 3450 3451
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
3452
    // the pDataBlock are always the same one, no need to call this again
3453
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3454
    doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo));
5
54liuyao 已提交
3455 3456 3457 3458 3459
    if (IS_FINAL_OP(pInfo)) {
      int32_t chIndex = getChildIndex(pBlock);
      int32_t size = taosArrayGetSize(pInfo->pChildren);
      // if chIndex + 1 - size > 0, add new child
      for (int32_t i = 0; i < chIndex + 1 - size; i++) {
3460 3461
        SOperatorInfo* pChildOp =
            createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
5
54liuyao 已提交
3462
        if (!pChildOp) {
S
Shengliang Guan 已提交
3463
          T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3464 3465 3466
        }
        taosArrayPush(pInfo->pChildren, &pChildOp);
      }
3467
      SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
3468
      setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3469
      doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
3470
    }
5
54liuyao 已提交
3471 3472
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
5
54liuyao 已提交
3473 3474 3475
  }
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
3476

5
54liuyao 已提交
3477
  closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
5
54liuyao 已提交
3478
  closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
5
54liuyao 已提交
3479 3480 3481 3482 3483 3484
  copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated);
  removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
  tSimpleHashCleanup(pInfo->pStUpdated);
  pInfo->pStUpdated = NULL;
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
  pInfo->pUpdated = NULL;
5
54liuyao 已提交
3485
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
3486

3487 3488 3489 3490 3491 3492
#if 0
  char* pBuf = streamStateSessionDump(pAggSup->pState);
  qDebug("===stream===final session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

3493
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3494
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3495
    printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
5
54liuyao 已提交
3496 3497
    return pInfo->pDelRes;
  }
5
54liuyao 已提交
3498 3499 3500 3501 3502 3503 3504

  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
  if (pBInfo->pRes->info.rows > 0) {
    printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
    return pBInfo->pRes;
  }

H
Haojun Liao 已提交
3505
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
3506
  return NULL;
5
54liuyao 已提交
3507 3508
}

5
54liuyao 已提交
3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                  SExecTaskInfo* pTaskInfo) {
  SSessionWinodwPhysiNode*       pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
  int32_t                        numOfCols = 0;
  int32_t                        code = TSDB_CODE_OUT_OF_MEMORY;
  SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

  pOperator->pTaskInfo = pTaskInfo;

  initResultSizeInfo(&pOperator->resultInfo, 4096);
  if (pSessionNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
5
54liuyao 已提交
3529 3530
    }
  }
5
54liuyao 已提交
3531 3532 3533
  SExprSupp* pSup = &pOperator->exprSupp;

  SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
3534
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
5
54liuyao 已提交
3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568
  code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
                                pTaskInfo->streamInfo.pState, 0, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pSessionNode->window.watermark,
      .calTrigger = pSessionNode->window.triggerType,
      .maxTs = INT64_MIN,
      .minTs = INT64_MAX,
  };

  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

  pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
  if (pSessionNode->window.pTsEnd) {
    pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
  }
  pInfo->binfo.pRes = pResBlock;
  pInfo->order = TSDB_ORDER_ASC;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
  pInfo->pDelIterator = NULL;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
  pInfo->pChildren = NULL;
  pInfo->isFinal = false;
  pInfo->pPhyNode = pPhyNode;
  pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
3569
  pInfo->ignoreExpiredDataSaved = false;
5
54liuyao 已提交
3570 3571
  pInfo->pUpdated = NULL;
  pInfo->pStUpdated = NULL;
L
liuyao 已提交
3572
  pInfo->dataVersion = 0;
5
54liuyao 已提交
3573

H
Haojun Liao 已提交
3574 3575
  setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
                  OP_NOT_OPENED, pInfo, pTaskInfo);
L
Liu Jicong 已提交
3576 3577
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
                                         optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
3578

5
54liuyao 已提交
3579
  if (downstream) {
5
54liuyao 已提交
3580
    initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
5
54liuyao 已提交
3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597
    code = appendDownstream(pOperator, &downstream, 1);
  }
  return pOperator;

_error:
  if (pInfo != NULL) {
    destroyStreamSessionAggOperatorInfo(pInfo);
  }

  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
  tSimpleHashClear(pInfo->streamAggSup.pResultRows);
  streamStateSessionClear(pInfo->streamAggSup.pState);
5
54liuyao 已提交
3598 3599 3600 3601 3602 3603 3604
}

static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
  SOptrBasicInfo*                pBInfo = &pInfo->binfo;
  TSKEY                          maxTs = INT64_MIN;
  SExprSupp*                     pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3605
  SStreamAggSupporter*           pAggSup = &pInfo->streamAggSup;
3606

5
54liuyao 已提交
3607 3608
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
3609
  }
L
Liu Jicong 已提交
3610

3611
  {
5
54liuyao 已提交
3612
    doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
5
54liuyao 已提交
3613
    if (pBInfo->pRes->info.rows > 0) {
H
Haojun Liao 已提交
3614
      printDataBlock(pBInfo->pRes, "semi session");
5
54liuyao 已提交
3615 3616 3617
      return pBInfo->pRes;
    }

3618
    doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
3619
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3620
      printDataBlock(pInfo->pDelRes, "semi session delete");
5
54liuyao 已提交
3621 3622
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3623

3624
    if (pOperator->status == OP_RES_TO_RETURN) {
5
54liuyao 已提交
3625
      clearFunctionContext(&pOperator->exprSupp);
3626 3627
      // semi interval operator clear disk buffer
      clearStreamSessionOperator(pInfo);
H
Haojun Liao 已提交
3628
      setOperatorCompleted(pOperator);
3629 3630
      return NULL;
    }
5
54liuyao 已提交
3631 3632 3633
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3634 3635 3636 3637
  if (!pInfo->pUpdated) {
    pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey));
  }
  if (!pInfo->pStUpdated) {
3638
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3639 3640
    pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
  }
5
54liuyao 已提交
3641 3642 3643
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
3644
      clearSpecialDataBlock(pInfo->pUpdateRes);
3645
      pOperator->status = OP_RES_TO_RETURN;
5
54liuyao 已提交
3646 3647
      break;
    }
H
Haojun Liao 已提交
3648
    printDataBlock(pBlock, "semi session recv");
5
54liuyao 已提交
3649

5
54liuyao 已提交
3650 3651
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
5
54liuyao 已提交
3652
      // gap must be 0
3653
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
5
54liuyao 已提交
3654
      doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
5
54liuyao 已提交
3655
      removeSessionResults(pInfo->pStUpdated, pWins);
5
54liuyao 已提交
3656
      copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
3657
      taosArrayDestroy(pWins);
5
54liuyao 已提交
3658
      break;
5
54liuyao 已提交
3659
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3660
      getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pStUpdated);
5
54liuyao 已提交
3661
      continue;
5
54liuyao 已提交
3662 3663 3664 3665
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
      return pBlock;
    } else {
      ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
5
54liuyao 已提交
3666 3667
    }

5
54liuyao 已提交
3668 3669 3670 3671
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
5
54liuyao 已提交
3672
    // the pDataBlock are always the same one, no need to call this again
3673
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
3674
    doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, NULL, false);
5
54liuyao 已提交
3675 3676 3677 3678
    maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
  }

  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
3679
  pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
3680

5
54liuyao 已提交
3681 3682 3683 3684
  copyUpdateResult(pInfo->pStUpdated, pInfo->pUpdated);
  removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
  tSimpleHashCleanup(pInfo->pStUpdated);
  pInfo->pStUpdated = NULL;
5
54liuyao 已提交
3685

5
54liuyao 已提交
3686 3687
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
  pInfo->pUpdated = NULL;
5
54liuyao 已提交
3688
  blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
3689

3690 3691 3692 3693 3694 3695
#if 0
  char* pBuf = streamStateSessionDump(pAggSup->pState);
  qDebug("===stream===semi session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

5
54liuyao 已提交
3696
  doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
5
54liuyao 已提交
3697
  if (pBInfo->pRes->info.rows > 0) {
H
Haojun Liao 已提交
3698
    printDataBlock(pBInfo->pRes, "semi session");
5
54liuyao 已提交
3699 3700 3701
    return pBInfo->pRes;
  }

3702
  doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
3703
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3704
    printDataBlock(pInfo->pDelRes, "semi session delete");
5
54liuyao 已提交
3705 3706
    return pInfo->pDelRes;
  }
5
54liuyao 已提交
3707

5
54liuyao 已提交
3708 3709 3710
  clearFunctionContext(&pOperator->exprSupp);
  // semi interval operator clear disk buffer
  clearStreamSessionOperator(pInfo);
H
Haojun Liao 已提交
3711
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
3712
  return NULL;
5
54liuyao 已提交
3713
}
3714

3715 3716
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                       SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
3717 3718
  int32_t        code = TSDB_CODE_OUT_OF_MEMORY;
  SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo);
3719 3720 3721
  if (pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3722

3723
  SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
5
54liuyao 已提交
3724

H
Haojun Liao 已提交
3725
  pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
L
Liu Jicong 已提交
3726
  char* name = (pInfo->isFinal) ? "StreamSessionFinalAggOperator" : "StreamSessionSemiAggOperator";
H
Haojun Liao 已提交
3727 3728

  if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
H
Haojun Liao 已提交
3729
    pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
5
54liuyao 已提交
3730
    blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
3731 3732
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
                                           destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
5
54liuyao 已提交
3733
  }
3734

L
Liu Jicong 已提交
3735
  setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
H
Haojun Liao 已提交
3736

5
54liuyao 已提交
3737 3738 3739 3740
  pOperator->operatorType = pPhyNode->type;
  if (numOfChild > 0) {
    pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
    for (int32_t i = 0; i < numOfChild; i++) {
5
54liuyao 已提交
3741 3742
      SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
      if (pChildOp == NULL) {
5
54liuyao 已提交
3743 3744
        goto _error;
      }
5
54liuyao 已提交
3745 3746 3747 3748
      SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
      pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      streamStateSetNumber(pChInfo->streamAggSup.pState, i);
      taosArrayPush(pInfo->pChildren, &pChildOp);
3749 3750
    }
  }
3751 3752 3753 3754 3755

  if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
    pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
  }

3756 3757 3758 3759
  return pOperator;

_error:
  if (pInfo != NULL) {
3760
    destroyStreamSessionAggOperatorInfo(pInfo);
3761 3762 3763 3764 3765
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
5
54liuyao 已提交
3766

3767
void destroyStreamStateOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3768
  SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param;
3769
  cleanupBasicInfo(&pInfo->binfo);
5
54liuyao 已提交
3770
  destroyStreamAggSupporter(&pInfo->streamAggSup);
5
54liuyao 已提交
3771 3772 3773 3774
  cleanupGroupResInfo(&pInfo->groupResInfo);
  if (pInfo->pChildren != NULL) {
    int32_t size = taosArrayGetSize(pInfo->pChildren);
    for (int32_t i = 0; i < size; i++) {
5
54liuyao 已提交
3775 3776
      SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
      destroyOperatorInfo(pChild);
5
54liuyao 已提交
3777
    }
5
54liuyao 已提交
3778
    taosArrayDestroy(pInfo->pChildren);
5
54liuyao 已提交
3779
  }
5
54liuyao 已提交
3780 3781
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
  blockDataDestroy(pInfo->pDelRes);
5
54liuyao 已提交
3782
  tSimpleHashCleanup(pInfo->pSeDeleted);
D
dapan1121 已提交
3783
  taosMemoryFreeClear(param);
5
54liuyao 已提交
3784 3785 3786
}

bool isTsInWindow(SStateWindowInfo* pWin, TSKEY ts) {
5
54liuyao 已提交
3787
  if (pWin->winInfo.sessionWin.win.skey <= ts && ts <= pWin->winInfo.sessionWin.win.ekey) {
5
54liuyao 已提交
3788 3789 3790 3791 3792 3793
    return true;
  }
  return false;
}

bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
5
54liuyao 已提交
3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825
  return pKeyData && compareVal(pKeyData, pWin->pStateKey);
}

bool compareStateKey(void* data, void* key) {
  SStateKeys* stateKey = (SStateKeys*)key;
  stateKey->pData = (char*)key + sizeof(SStateKeys);
  return compareVal(data, stateKey);
}

void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, char* pKeyData,
                       SStateWindowInfo* pCurWin, SStateWindowInfo* pNextWin) {
  int32_t size = pAggSup->resultRowSize;
  pCurWin->winInfo.sessionWin.groupId = groupId;
  pCurWin->winInfo.sessionWin.win.skey = ts;
  pCurWin->winInfo.sessionWin.win.ekey = ts;
  int32_t code =
      streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize,
                                    compareStateKey, &pCurWin->winInfo.pOutputBuf, &size);
  pCurWin->pStateKey =
      (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
  pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
  pCurWin->pStateKey->type = pAggSup->stateKeyType;
  pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
  pCurWin->pStateKey->isNull = false;

  if (code == TSDB_CODE_SUCCESS) {
    pCurWin->winInfo.isOutput = true;
  } else {
    if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
      varDataCopy(pCurWin->pStateKey->pData, pKeyData);
    } else {
      memcpy(pCurWin->pStateKey->pData, pKeyData, pCurWin->pStateKey->bytes);
5
54liuyao 已提交
3826 3827 3828
    }
  }

5
54liuyao 已提交
3829 3830 3831 3832 3833 3834
  pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
  pNextWin->winInfo.pOutputBuf = NULL;
  SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
  code = streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_INVALID(pNextWin->winInfo);
5
54liuyao 已提交
3835
  }
5
54liuyao 已提交
3836
  streamStateFreeCur(pCur);
5
54liuyao 已提交
3837 3838
}

5
54liuyao 已提交
3839
int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId,
H
Haojun Liao 已提交
3840
                              SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual,
5
54liuyao 已提交
3841
                              SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) {
5
54liuyao 已提交
3842 3843 3844 3845
  *allEqual = true;
  for (int32_t i = start; i < rows; ++i) {
    char* pKeyData = colDataGetData(pKeyCol, i);
    if (!isTsInWindow(pWinInfo, pTs[i])) {
X
Xiaoyu Wang 已提交
3846
      if (isEqualStateKey(pWinInfo, pKeyData)) {
5
54liuyao 已提交
3847
        if (IS_VALID_SESSION_WIN(pNextWin->winInfo)) {
5
54liuyao 已提交
3848
          // ts belongs to the next window
5
54liuyao 已提交
3849
          if (pTs[i] >= pNextWin->winInfo.sessionWin.win.skey) {
5
54liuyao 已提交
3850 3851 3852 3853 3854 3855 3856
            return i - start;
          }
        }
      } else {
        return i - start;
      }
    }
5
54liuyao 已提交
3857 3858

    if (pWinInfo->winInfo.sessionWin.win.skey > pTs[i]) {
H
Haojun Liao 已提交
3859
      if (pSeDeleted && pWinInfo->winInfo.isOutput) {
5
54liuyao 已提交
3860
        saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
5
54liuyao 已提交
3861
      }
5
54liuyao 已提交
3862 3863
      removeSessionResult(pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin);
      pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
5
54liuyao 已提交
3864
    }
5
54liuyao 已提交
3865
    pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]);
5
54liuyao 已提交
3866 3867 3868 3869 3870 3871 3872
    if (!isEqualStateKey(pWinInfo, pKeyData)) {
      *allEqual = false;
    }
  }
  return rows - start;
}

5
54liuyao 已提交
3873 3874
static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
                                 SSHashObj* pStDeleted) {
X
Xiaoyu Wang 已提交
3875
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3876
  SStreamStateAggOperatorInfo* pInfo = pOperator->info;
3877
  int32_t                      numOfOutput = pOperator->exprSupp.numOfExprs;
H
Haojun Liao 已提交
3878
  uint64_t                     groupId = pSDataBlock->info.id.groupId;
X
Xiaoyu Wang 已提交
3879 3880 3881 3882
  int64_t                      code = TSDB_CODE_SUCCESS;
  TSKEY*                       tsCols = NULL;
  SResultRow*                  pResult = NULL;
  int32_t                      winRows = 0;
L
liuyao 已提交
3883 3884 3885

  pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);

5
54liuyao 已提交
3886
  if (pSDataBlock->pDataBlock != NULL) {
X
Xiaoyu Wang 已提交
3887 3888
    SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
    tsCols = (int64_t*)pColDataInfo->pData;
5
54liuyao 已提交
3889
  } else {
X
Xiaoyu Wang 已提交
3890
    return;
5
54liuyao 已提交
3891
  }
L
Liu Jicong 已提交
3892

5
54liuyao 已提交
3893
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
5
54liuyao 已提交
3894 3895
  int32_t              rows = pSDataBlock->info.rows;
  blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
L
Liu Jicong 已提交
3896
  SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
5
54liuyao 已提交
3897
  for (int32_t i = 0; i < rows; i += winRows) {
3898
    if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup) || colDataIsNull_s(pKeyColInfo, i)) {
5
54liuyao 已提交
3899 3900 3901
      i++;
      continue;
    }
5
54liuyao 已提交
3902 3903 3904 3905 3906 3907 3908 3909 3910
    char*            pKeyData = colDataGetData(pKeyColInfo, i);
    int32_t          winIndex = 0;
    bool             allEqual = true;
    SStateWindowInfo curWin = {0};
    SStateWindowInfo nextWin = {0};
    setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin);
    setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
    winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
                                    pAggSup->pResultRows, pSeUpdated, pStDeleted);
5
54liuyao 已提交
3911
    if (!allEqual) {
3912
      uint64_t uid = 0;
5
54liuyao 已提交
3913 3914 3915 3916 3917
      appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
                                       &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
      tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
      doDeleteSessionWindow(pAggSup, &curWin.winInfo.sessionWin);
      releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curWin.winInfo.pOutputBuf);
5
54liuyao 已提交
3918 3919
      continue;
    }
5
54liuyao 已提交
3920 3921
    code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
                              pOperator);
5
54liuyao 已提交
3922
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
3923
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3924
    }
5
54liuyao 已提交
3925 3926
    saveSessionOutputBuf(pAggSup, &curWin.winInfo);

5
54liuyao 已提交
3927
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
5
54liuyao 已提交
3928
      code = saveResult(curWin.winInfo, pSeUpdated);
5
54liuyao 已提交
3929
      if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
3930
        T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
5
54liuyao 已提交
3931 3932
      }
    }
3933 3934

    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
5
54liuyao 已提交
3935 3936
      SSessionKey key = {0};
      getSessionHashKey(&curWin.winInfo.sessionWin, &key);
3937 3938
      tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
    }
5
54liuyao 已提交
3939 3940 3941 3942 3943 3944 3945 3946
  }
}

static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

3947
  SExprSupp*                   pSup = &pOperator->exprSupp;
5
54liuyao 已提交
3948
  SStreamStateAggOperatorInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
3949
  SOptrBasicInfo*              pBInfo = &pInfo->binfo;
5
54liuyao 已提交
3950
  if (pOperator->status == OP_RES_TO_RETURN) {
3951
    doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
3952
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
3953
      printDataBlock(pInfo->pDelRes, "single state delete");
5
54liuyao 已提交
3954 3955
      return pInfo->pDelRes;
    }
5
54liuyao 已提交
3956 3957 3958 3959 3960

    doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
    if (pBInfo->pRes->info.rows > 0) {
      printDataBlock(pBInfo->pRes, "single state");
      return pBInfo->pRes;
5
54liuyao 已提交
3961
    }
5
54liuyao 已提交
3962

H
Haojun Liao 已提交
3963
    setOperatorCompleted(pOperator);
5
54liuyao 已提交
3964
    return NULL;
5
54liuyao 已提交
3965 3966 3967
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];
5
54liuyao 已提交
3968 3969 3970 3971
  if (!pInfo->pUpdated) {
    pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey));
  }
  if (!pInfo->pSeUpdated) {
3972
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
3973 3974
    pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
  }
5
54liuyao 已提交
3975 3976 3977 3978 3979
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      break;
    }
5
54liuyao 已提交
3980
    printDataBlock(pBlock, "single state recv");
3981

5
54liuyao 已提交
3982 3983 3984 3985
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
      SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
      doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
5
54liuyao 已提交
3986
      removeSessionResults(pInfo->pSeUpdated, pWins);
5
54liuyao 已提交
3987
      copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
3988 3989
      taosArrayDestroy(pWins);
      continue;
3990
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
3991
      getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
5
54liuyao 已提交
3992
      continue;
5
54liuyao 已提交
3993 3994 3995 3996
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
      return pBlock;
    } else {
      ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
5
54liuyao 已提交
3997
    }
3998

5
54liuyao 已提交
3999 4000 4001 4002
    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }
4003
    // the pDataBlock are always the same one, no need to call this again
4004
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
5
54liuyao 已提交
4005 4006
    doStreamStateAggImpl(pOperator, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
5
54liuyao 已提交
4007 4008 4009
  }
  // restore the value
  pOperator->status = OP_RES_TO_RETURN;
X
Xiaoyu Wang 已提交
4010

5
54liuyao 已提交
4011 4012 4013 4014 4015
  closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);
  copyUpdateResult(pInfo->pSeUpdated, pInfo->pUpdated);
  removeSessionResults(pInfo->pSeDeleted, pInfo->pUpdated);
  tSimpleHashCleanup(pInfo->pSeUpdated);
  pInfo->pSeUpdated = NULL;
5
54liuyao 已提交
4016

5
54liuyao 已提交
4017 4018
  initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
  pInfo->pUpdated = NULL;
5
54liuyao 已提交
4019
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5
54liuyao 已提交
4020

5
54liuyao 已提交
4021 4022 4023 4024 4025 4026
#if 0
  char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState);
  qDebug("===stream===final session%s", pBuf);
  taosMemoryFree(pBuf);
#endif

4027
  doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
5
54liuyao 已提交
4028
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4029
    printDataBlock(pInfo->pDelRes, "single state delete");
5
54liuyao 已提交
4030 4031 4032
    return pInfo->pDelRes;
  }

5
54liuyao 已提交
4033 4034 4035 4036 4037
  doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
  if (pBInfo->pRes->info.rows > 0) {
    printDataBlock(pBInfo->pRes, "single state");
    return pBInfo->pRes;
  }
H
Haojun Liao 已提交
4038
  setOperatorCompleted(pOperator);
5
54liuyao 已提交
4039
  return NULL;
4040 4041
}

X
Xiaoyu Wang 已提交
4042 4043 4044 4045 4046
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo) {
  SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
  int32_t                      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
  SColumnNode*                 pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
H
Haojun Liao 已提交
4047
  int32_t                      code = TSDB_CODE_SUCCESS;
5
54liuyao 已提交
4048

X
Xiaoyu Wang 已提交
4049 4050
  SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo));
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5
54liuyao 已提交
4051
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
4052
    code = TSDB_CODE_OUT_OF_MEMORY;
5
54liuyao 已提交
4053 4054 4055 4056
    goto _error;
  }

  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
4057
  initResultSizeInfo(&pOperator->resultInfo, 4096);
5
54liuyao 已提交
4058 4059 4060
  if (pStateNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
H
Haojun Liao 已提交
4061
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
5
54liuyao 已提交
4062 4063 4064 4065 4066
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

X
Xiaoyu Wang 已提交
4067 4068
  pInfo->twAggSup = (STimeWindowAggSupp){
      .waterMark = pStateNode->window.watermark,
5
54liuyao 已提交
4069 4070
      .calTrigger = pStateNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
4071
      .minTs = INT64_MAX,
X
Xiaoyu Wang 已提交
4072
  };
4073

5
54liuyao 已提交
4074
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
4075

5
54liuyao 已提交
4076 4077 4078
  SExprSupp*   pSup = &pOperator->exprSupp;
  int32_t      numOfCols = 0;
  SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
4079
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
4080
  code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
5
54liuyao 已提交
4081 4082 4083
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
5
54liuyao 已提交
4084 4085 4086 4087
  int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
  int16_t type = pColNode->node.resType.type;
  code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
                                type);
5
54liuyao 已提交
4088 4089 4090 4091 4092 4093
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  pInfo->primaryTsIndex = tsSlotId;
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5
54liuyao 已提交
4094
  pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
5
54liuyao 已提交
4095
  pInfo->pDelIterator = NULL;
H
Haojun Liao 已提交
4096
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
5
54liuyao 已提交
4097
  pInfo->pChildren = NULL;
5
54liuyao 已提交
4098
  pInfo->ignoreExpiredData = pStateNode->window.igExpired;
4099
  pInfo->ignoreExpiredDataSaved = false;
5
54liuyao 已提交
4100 4101
  pInfo->pUpdated = NULL;
  pInfo->pSeUpdated = NULL;
L
liuyao 已提交
4102
  pInfo->dataVersion = 0;
5
54liuyao 已提交
4103

L
Liu Jicong 已提交
4104 4105
  setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
4106 4107
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
                                         optrDefaultBufFn, NULL);
5
54liuyao 已提交
4108
  initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
5
54liuyao 已提交
4109 4110 4111 4112 4113 4114 4115
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
  return pOperator;

_error:
4116
  destroyStreamStateOperatorInfo(pInfo);
5
54liuyao 已提交
4117 4118 4119 4120
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
4121

4122
void destroyMAIOperatorInfo(void* param) {
4123
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
4124
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
D
dapan1121 已提交
4125
  taosMemoryFreeClear(param);
4126 4127
}

4128
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
H
Haojun Liao 已提交
4129
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
4130 4131 4132
  if (NULL == pResult) {
    return pResult;
  }
H
Haojun Liao 已提交
4133
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
4134 4135
  return pResult;
}
4136

4137 4138 4139 4140 4141 4142 4143 4144
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
  if (*pResult == NULL) {
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
    if (*pResult == NULL) {
      return terrno;
    }
  }
4145

4146
  // set time window for current result
4147 4148
  (*pResult)->win = (*win);
  setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
4149
  return TSDB_CODE_SUCCESS;
4150 4151
}

4152
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
4153
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
4154
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
D
dapan1121 已提交
4155
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
4156 4157

  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
4158
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
4159
  SInterval*     pInterval = &iaInfo->interval;
4160

5
54liuyao 已提交
4161 4162
  int32_t  startPos = 0;
  int64_t* tsCols = extractTsCol(pBlock, iaInfo);
4163

4164 4165
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);

4166 4167
  // there is an result exists
  if (miaInfo->curTs != INT64_MIN) {
4168
    if (ts != miaInfo->curTs) {
4169
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
4170
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
4171
      miaInfo->curTs = ts;
4172
    }
4173 4174
  } else {
    miaInfo->curTs = ts;
4175 4176 4177
  }

  STimeWindow win = {0};
4178
  win.skey = miaInfo->curTs;
4179
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
4180

5
54liuyao 已提交
4181
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4182 4183
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
    T_LONG_JMP(pTaskInfo->env, ret);
4184 4185
  }

4186 4187
  int32_t currPos = startPos;

4188
  STimeWindow currWin = win;
4189
  while (++currPos < pBlock->info.rows) {
4190
    if (tsCols[currPos] == miaInfo->curTs) {
4191
      continue;
4192 4193 4194
    }

    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
H
Haojun Liao 已提交
4195 4196
    applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
                                    currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
4197

4198
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
4199
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
4200
    miaInfo->curTs = tsCols[currPos];
4201

4202
    currWin.skey = miaInfo->curTs;
4203
    currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
4204 4205

    startPos = currPos;
5
54liuyao 已提交
4206
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4207 4208
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
      T_LONG_JMP(pTaskInfo->env, ret);
4209
    }
4210 4211

    miaInfo->curTs = currWin.skey;
4212
  }
4213

4214
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
H
Haojun Liao 已提交
4215
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
L
Liu Jicong 已提交
4216
                                  pBlock->info.rows, pSup->numOfExprs);
4217 4218
}

4219
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
H
Haojun Liao 已提交
4220
  pRes->info.id.groupId = pMiaInfo->groupId;
4221 4222
  pMiaInfo->curTs = INT64_MIN;
  pMiaInfo->groupId = 0;
4223 4224
}

4225
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
S
shenglian zhou 已提交
4226
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4227

4228 4229
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
4230

4231 4232 4233 4234 4235
  SExprSupp*      pSup = &pOperator->exprSupp;
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
  int32_t         scanFlag = MAIN_SCAN;
4236

4237 4238
  while (1) {
    SSDataBlock* pBlock = NULL;
4239
    if (pMiaInfo->prefetchedBlock == NULL) {
4240 4241
      pBlock = downstream->fpSet.getNextFn(downstream);
    } else {
4242 4243
      pBlock = pMiaInfo->prefetchedBlock;
      pMiaInfo->prefetchedBlock = NULL;
4244

H
Haojun Liao 已提交
4245
      pMiaInfo->groupId = pBlock->info.id.groupId;
4246
    }
4247

4248
    // no data exists, all query processing is done
4249
    if (pBlock == NULL) {
4250 4251 4252
      // close last unclosed time window
      if (pMiaInfo->curTs != INT64_MIN) {
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
4253 4254
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
4255
      }
4256

H
Haojun Liao 已提交
4257
      setOperatorCompleted(pOperator);
4258
      break;
4259
    }
4260

H
Haojun Liao 已提交
4261
    if (pMiaInfo->groupId == 0) {
H
Haojun Liao 已提交
4262 4263 4264
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
        pMiaInfo->groupId = pBlock->info.id.groupId;
        pRes->info.id.groupId = pMiaInfo->groupId;
H
Haojun Liao 已提交
4265 4266
      }
    } else {
H
Haojun Liao 已提交
4267
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
H
Haojun Liao 已提交
4268
        // if there are unclosed time window, close it firstly.
4269
        ASSERT(pMiaInfo->curTs != INT64_MIN);
H
Haojun Liao 已提交
4270
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
4271
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
H
Haojun Liao 已提交
4272

4273 4274
        pMiaInfo->prefetchedBlock = pBlock;
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
H
Haojun Liao 已提交
4275
        break;
5
54liuyao 已提交
4276
      } else {
H
Haojun Liao 已提交
4277
        // continue
H
Haojun Liao 已提交
4278
        pRes->info.id.groupId = pMiaInfo->groupId;
H
Haojun Liao 已提交
4279
      }
4280
    }
4281

4282
    getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag, false);
4283
    setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
4284
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
4285

H
Haojun Liao 已提交
4286
    doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
4287 4288 4289
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
      break;
    }
4290
  }
4291 4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305
}

static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pRes = iaInfo->binfo.pRes;
  blockDataCleanup(pRes);

  if (iaInfo->binfo.mergeResultBlock) {
dengyihao's avatar
dengyihao 已提交
4306
    while (1) {
4307
      if (pOperator->status == OP_EXEC_DONE) {
4308 4309
        break;
      }
4310

4311
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
4312 4313 4314
        break;
      }

4315 4316 4317 4318
      doMergeAlignedIntervalAgg(pOperator);
    }
  } else {
    doMergeAlignedIntervalAgg(pOperator);
4319 4320 4321 4322 4323 4324 4325
  }

  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;
  return (rows == 0) ? NULL : pRes;
}

4326
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
4327
                                                      SExecTaskInfo* pTaskInfo) {
4328
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
4329
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4330 4331 4332 4333
  if (miaInfo == NULL || pOperator == NULL) {
    goto _error;
  }

D
dapan1121 已提交
4334 4335 4336 4337 4338
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
  if (miaInfo->intervalAggOperatorInfo == NULL) {
    goto _error;
  }

4339 4340 4341 4342 4343 4344 4345
  SInterval interval = {.interval = pNode->interval,
                        .sliding = pNode->sliding,
                        .intervalUnit = pNode->intervalUnit,
                        .slidingUnit = pNode->slidingUnit,
                        .offset = pNode->offset,
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision};

D
dapan1121 已提交
4346
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
4347
  SExprSupp*                pSup = &pOperator->exprSupp;
4348

H
Haojun Liao 已提交
4349 4350 4351 4352 4353
  int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

L
Liu Jicong 已提交
4354 4355 4356 4357
  miaInfo->curTs = INT64_MIN;
  iaInfo->win = pTaskInfo->window;
  iaInfo->inputOrder = TSDB_ORDER_ASC;
  iaInfo->interval = interval;
4358 4359
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
4360 4361

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
H
Haojun Liao 已提交
4362
  initResultSizeInfo(&pOperator->resultInfo, 512);
4363

H
Haojun Liao 已提交
4364 4365
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
H
Haojun Liao 已提交
4366

L
Liu Jicong 已提交
4367 4368
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
4369 4370 4371
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4372

H
Haojun Liao 已提交
4373
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
4374
  initBasicInfo(&iaInfo->binfo, pResBlock);
4375
  initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
4376

4377
  iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo);
4378
  if (iaInfo->timeWindowInterpo) {
4379
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
4380 4381
  }

4382
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
4383
  blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
L
Liu Jicong 已提交
4384 4385
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
4386

L
Liu Jicong 已提交
4387 4388
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo,
                                         optrDefaultBufFn, NULL);
4389 4390 4391 4392 4393 4394 4395 4396 4397

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
4398
  destroyMAIOperatorInfo(miaInfo);
4399 4400 4401 4402
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}
4403 4404 4405 4406 4407

//=====================================================================================================================
// merge interval operator
typedef struct SMergeIntervalAggOperatorInfo {
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
L
Liu Jicong 已提交
4408 4409 4410 4411 4412 4413
  SList*                   groupIntervals;
  SListIter                groupIntervalsIter;
  bool                     hasGroupId;
  uint64_t                 groupId;
  SSDataBlock*             prefetchedBlock;
  bool                     inputBlocksFinished;
4414 4415
} SMergeIntervalAggOperatorInfo;

S
slzhou 已提交
4416
typedef struct SGroupTimeWindow {
L
Liu Jicong 已提交
4417
  uint64_t    groupId;
S
slzhou 已提交
4418 4419 4420
  STimeWindow window;
} SGroupTimeWindow;

4421
void destroyMergeIntervalOperatorInfo(void* param) {
4422
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
S
slzhou 已提交
4423
  tdListFree(miaInfo->groupIntervals);
4424
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
4425

D
dapan1121 已提交
4426
  taosMemoryFreeClear(param);
4427 4428
}

L
Liu Jicong 已提交
4429 4430
static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win,
                                    SSDataBlock* pResultBlock) {
4431 4432 4433
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
  SExecTaskInfo*                 pTaskInfo = pOperatorInfo->pTaskInfo;
4434
  bool                           ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4435 4436 4437
  SExprSupp*                     pExprSup = &pOperatorInfo->exprSupp;

  SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
L
Liu Jicong 已提交
4438 4439
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
      iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
4440
  ASSERT(p1 != NULL);
5
54liuyao 已提交
4441
  //  finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
4442
  tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
4443 4444 4445
  return TSDB_CODE_SUCCESS;
}

4446 4447 4448 4449
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
                                        STimeWindow* newWin) {
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
4450
  bool                           ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4451

S
slzhou 已提交
4452 4453
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
  tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
4454

S
slzhou 已提交
4455 4456 4457 4458 4459
  SListIter iter = {0};
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
  SListNode* listNode = NULL;
  while ((listNode = tdListNext(&iter)) != NULL) {
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
L
Liu Jicong 已提交
4460
    if (prevGrpWin->groupId != tableGroupId) {
S
slzhou 已提交
4461 4462
      continue;
    }
4463

S
slzhou 已提交
4464
    STimeWindow* prevWin = &prevGrpWin->window;
H
Haojun Liao 已提交
4465
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
5
54liuyao 已提交
4466
      //      finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
S
slzhou 已提交
4467 4468
      tdListPopNode(miaInfo->groupIntervals, listNode);
    }
4469 4470 4471 4472 4473 4474 4475 4476 4477 4478 4479 4480 4481 4482 4483 4484
  }

  return 0;
}

static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;

  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;

  int32_t     startPos = 0;
  int32_t     numOfOutput = pExprSup->numOfExprs;
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo);
H
Haojun Liao 已提交
4485
  uint64_t    tableGroupId = pBlock->info.id.groupId;
4486
  bool        ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
4487 4488 4489
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
  SResultRow* pResult = NULL;

4490 4491
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
                                        iaInfo->inputOrder);
4492 4493 4494 4495 4496

  int32_t ret =
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
4497
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4498 4499 4500 4501
  }

  TSKEY   ekey = ascScan ? win.ekey : win.skey;
  int32_t forwardRows =
4502
      getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
4503
  ASSERT(forwardRows > 0);
4504 4505 4506

  // prev time window not interpolation yet.
  if (iaInfo->timeWindowInterpo) {
4507
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
4508 4509 4510 4511 4512 4513
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);

    // restore current time window
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
    if (ret != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
4514
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4515 4516 4517 4518 4519 4520 4521
    }

    // window start key interpolation
    doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
  }

  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
H
Haojun Liao 已提交
4522
  applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
L
Liu Jicong 已提交
4523
                                  pBlock->info.rows, numOfOutput);
4524 4525 4526 4527 4528 4529 4530 4531
  doCloseWindow(pResultRowInfo, iaInfo, pResult);

  // output previous interval results after this interval (&win) is closed
  outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);

  STimeWindow nextWin = win;
  while (1) {
    int32_t prevEndPos = forwardRows - 1 + startPos;
4532 4533
    startPos =
        getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
4534 4535 4536 4537 4538 4539 4540 4541 4542
    if (startPos < 0) {
      break;
    }

    // null data, failed to allocate more memory buffer
    int32_t code =
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
S
Shengliang Guan 已提交
4543
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
4544 4545 4546 4547
    }

    ekey = ascScan ? nextWin.ekey : nextWin.skey;
    forwardRows =
4548
        getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder);
4549 4550 4551 4552 4553

    // window start(end) key interpolation
    doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);

    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
H
Haojun Liao 已提交
4554
    applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
L
Liu Jicong 已提交
4555
                                    pBlock->info.rows, numOfOutput);
4556 4557 4558 4559 4560 4561 4562 4563 4564 4565 4566 4567 4568 4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579 4580 4581 4582 4583 4584 4585 4586 4587 4588 4589 4590
    doCloseWindow(pResultRowInfo, iaInfo, pResult);

    // output previous interval results after this interval (&nextWin) is closed
    outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
  }

  if (iaInfo->timeWindowInterpo) {
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
  }
}

static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SSDataBlock* pRes = iaInfo->binfo.pRes;
  blockDataCleanup(pRes);
  blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);

  if (!miaInfo->inputBlocksFinished) {
    SOperatorInfo* downstream = pOperator->pDownstream[0];
    int32_t        scanFlag = MAIN_SCAN;
    while (1) {
      SSDataBlock* pBlock = NULL;
      if (miaInfo->prefetchedBlock == NULL) {
        pBlock = downstream->fpSet.getNextFn(downstream);
      } else {
        pBlock = miaInfo->prefetchedBlock;
H
Haojun Liao 已提交
4591
        miaInfo->groupId = pBlock->info.id.groupId;
4592 4593 4594 4595
        miaInfo->prefetchedBlock = NULL;
      }

      if (pBlock == NULL) {
S
slzhou 已提交
4596
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
4597 4598 4599 4600 4601 4602
        miaInfo->inputBlocksFinished = true;
        break;
      }

      if (!miaInfo->hasGroupId) {
        miaInfo->hasGroupId = true;
H
Haojun Liao 已提交
4603 4604
        miaInfo->groupId = pBlock->info.id.groupId;
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
4605 4606 4607 4608
        miaInfo->prefetchedBlock = pBlock;
        break;
      }

4609
      getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag, false);
4610
      setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true);
4611 4612 4613 4614 4615 4616 4617
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);

      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
        break;
      }
    }

H
Haojun Liao 已提交
4618
    pRes->info.id.groupId = miaInfo->groupId;
4619 4620 4621
  }

  if (miaInfo->inputBlocksFinished) {
S
slzhou 已提交
4622
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
4623

S
slzhou 已提交
4624 4625
    if (listNode != NULL) {
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
5
54liuyao 已提交
4626
      //      finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
H
Haojun Liao 已提交
4627
      pRes->info.id.groupId = grpWin->groupId;
4628 4629 4630 4631
    }
  }

  if (pRes->info.rows == 0) {
H
Haojun Liao 已提交
4632
    setOperatorCompleted(pOperator);
4633 4634 4635 4636 4637 4638 4639
  }

  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;
  return (rows == 0) ? NULL : pRes;
}

4640 4641 4642
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
                                               SExecTaskInfo* pTaskInfo) {
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
4643
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4644
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
4645 4646 4647
    goto _error;
  }

5
54liuyao 已提交
4648 4649
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4650 4651 4652 4653 4654 4655 4656

  SInterval interval = {.interval = pIntervalPhyNode->interval,
                        .sliding = pIntervalPhyNode->sliding,
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
                        .offset = pIntervalPhyNode->offset,
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4657

4658
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
4659

4660
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
L
Liu Jicong 已提交
4661
  pIntervalInfo->win = pTaskInfo->window;
4662
  pIntervalInfo->inputOrder = TSDB_ORDER_ASC;
L
Liu Jicong 已提交
4663
  pIntervalInfo->interval = interval;
4664 4665
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
4666 4667 4668 4669

  SExprSupp* pExprSupp = &pOperator->exprSupp;

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
4670
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4671

L
Liu Jicong 已提交
4672 4673
  int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
                            pTaskInfo->streamInfo.pState);
4674 4675 4676
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4677

H
Haojun Liao 已提交
4678
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
4679 4680
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
  initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
4681

4682 4683
  pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
  if (pIntervalInfo->timeWindowInterpo) {
4684
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
4685
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
4686 4687 4688 4689
      goto _error;
    }
  }

4690
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
L
Liu Jicong 已提交
4691 4692
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
L
Liu Jicong 已提交
4693 4694
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo,
                                         optrDefaultBufFn, NULL);
4695 4696 4697 4698 4699 4700 4701 4702 4703

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
H
Haojun Liao 已提交
4704 4705 4706 4707
  if (pMergeIntervalInfo != NULL) {
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
  }

4708 4709 4710
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
4711
}
4712 4713 4714

static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
  SStreamIntervalOperatorInfo* pInfo = pOperator->info;
4715 4716
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  SExprSupp*                   pSup = &pOperator->exprSupp;
4717 4718 4719 4720 4721 4722

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  if (pOperator->status == OP_RES_TO_RETURN) {
4723
    doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
4724
    if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4725
      printDataBlock(pInfo->pDelRes, "single interval delete");
4726 4727 4728
      return pInfo->pDelRes;
    }

4729
    doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
4730 4731 4732
    if (pInfo->binfo.pRes->info.rows > 0) {
      printDataBlock(pInfo->binfo.pRes, "single interval");
      return pInfo->binfo.pRes;
4733
    }
H
Haojun Liao 已提交
4734
    setOperatorCompleted(pOperator);
L
liuyao 已提交
4735 4736
    if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
      streamStateCommit(pInfo->pState);
L
liuyao 已提交
4737
      streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
L
liuyao 已提交
4738
      setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
L
liuyao 已提交
4739 4740
      pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
    }
5
54liuyao 已提交
4741
    return NULL;
4742 4743 4744 4745
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

5
54liuyao 已提交
4746
  if (!pInfo->pUpdated) {
L
liuyao 已提交
4747
    pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
5
54liuyao 已提交
4748 4749 4750
  }
  if (!pInfo->pUpdatedMap) {
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
L
liuyao 已提交
4751
    pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
5
54liuyao 已提交
4752 4753
  }

4754 4755 4756
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
5
54liuyao 已提交
4757 4758
      qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
      pInfo->numOfDatapack = 0;
4759 4760
      break;
    }
5
54liuyao 已提交
4761
    pInfo->numOfDatapack++;
4762 4763
    printDataBlock(pBlock, "single interval recv");

4764 4765
    if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
        pBlock->info.type == STREAM_CLEAR) {
5
54liuyao 已提交
4766
      doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap);
4767 4768
      continue;
    } else if (pBlock->info.type == STREAM_GET_ALL) {
5
54liuyao 已提交
4769
      getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
4770
      continue;
5
54liuyao 已提交
4771
    } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
5
54liuyao 已提交
4772
      printDataBlock(pBlock, "single interval");
5
54liuyao 已提交
4773 4774 4775
      return pBlock;
    } else {
      ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789 4790
    }

    if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) {
      // set input version
      pTaskInfo->version = pBlock->info.version;
    }

    if (pInfo->scalarSupp.pExprInfo != NULL) {
      SExprSupp* pExprSup = &pInfo->scalarSupp;
      projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
    }

    // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
    // caller. Note that all the time window are not close till now.
    // the pDataBlock are always the same one, no need to call this again
4791
    setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
4792 4793 4794 4795
    if (pInfo->invertible) {
      setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
    }

5
54liuyao 已提交
4796
    doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
5
54liuyao 已提交
4797 4798
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
    pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
4799 4800
  }
  pOperator->status = OP_RES_TO_RETURN;
5
54liuyao 已提交
4801
  removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
4802 4803
  closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL,
                            pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
4804

4805 4806 4807
  void*   pIte = NULL;
  int32_t iter = 0;
  while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
5
54liuyao 已提交
4808
    taosArrayPush(pInfo->pUpdated, pIte);
4809
  }
5
54liuyao 已提交
4810
  taosArraySort(pInfo->pUpdated, winPosCmprImpl);
4811

5
54liuyao 已提交
4812 4813
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
  pInfo->pUpdated = NULL;
4814
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
4815
  tSimpleHashCleanup(pInfo->pUpdatedMap);
5
54liuyao 已提交
4816
  pInfo->pUpdatedMap = NULL;
5
54liuyao 已提交
4817

5
54liuyao 已提交
4818 4819 4820 4821 4822 4823
#if 0
  char* pBuf = streamStateIntervalDump(pInfo->pState);
  qDebug("===stream===interval state%s", pBuf);
  taosMemoryFree(pBuf);
#endif

4824
  doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
4825
  if (pInfo->pDelRes->info.rows > 0) {
5
54liuyao 已提交
4826
    printDataBlock(pInfo->pDelRes, "single interval delete");
4827 4828 4829
    return pInfo->pDelRes;
  }

4830
  doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
5
54liuyao 已提交
4831 4832 4833 4834 4835 4836
  if (pInfo->binfo.pRes->info.rows > 0) {
    printDataBlock(pInfo->binfo.pRes, "single interval");
    return pInfo->binfo.pRes;
  }

  return NULL;
4837 4838 4839 4840 4841
}

SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
                                                SExecTaskInfo* pTaskInfo) {
  SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
4842
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4843 4844 4845 4846 4847
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
  SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;

H
Haojun Liao 已提交
4848
  int32_t    code = TSDB_CODE_SUCCESS;
4849 4850
  int32_t    numOfCols = 0;
  SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
H
Haojun Liao 已提交
4851

H
Haojun Liao 已提交
4852
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
dengyihao's avatar
dengyihao 已提交
4853 4854 4855 4856 4857 4858 4859
  pInfo->interval = (SInterval){
      .interval = pIntervalPhyNode->interval,
      .sliding = pIntervalPhyNode->sliding,
      .intervalUnit = pIntervalPhyNode->intervalUnit,
      .slidingUnit = pIntervalPhyNode->slidingUnit,
      .offset = pIntervalPhyNode->offset,
      .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
4860
  };
H
Haojun Liao 已提交
4861

dengyihao's avatar
dengyihao 已提交
4862
  pInfo->twAggSup = (STimeWindowAggSupp){
4863 4864 4865
      .waterMark = pIntervalPhyNode->window.watermark,
      .calTrigger = pIntervalPhyNode->window.triggerType,
      .maxTs = INT64_MIN,
5
54liuyao 已提交
4866
      .minTs = INT64_MAX,
5
54liuyao 已提交
4867
      .deleteMark = getDeleteMark(pIntervalPhyNode),
L
liuyao 已提交
4868
      .checkPointTs = 0,
dengyihao's avatar
dengyihao 已提交
4869 4870
      .checkPointInterval =
          convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision),
4871
  };
H
Haojun Liao 已提交
4872

L
liuyao 已提交
4873
  ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
4874

4875 4876
  pOperator->pTaskInfo = pTaskInfo;
  pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
4877
  pInfo->ignoreExpiredDataSaved = false;
4878 4879 4880
  pInfo->isFinal = false;

  SExprSupp* pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
4881 4882 4883 4884
  initBasicInfo(&pInfo->binfo, pResBlock);
  initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

4885
  pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
4886
  initResultSizeInfo(&pOperator->resultInfo, 4096);
4887

4888
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
L
Liu Jicong 已提交
4889 4890
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
                    pTaskInfo->streamInfo.pState);
4891 4892 4893 4894
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
4895 4896 4897 4898 4899 4900 4901 4902
  if (pIntervalPhyNode->window.pExprs != NULL) {
    int32_t    numOfScalar = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }
4903 4904

  pInfo->invertible = allInvertible(pSup->pCtx, numOfCols);
4905
  pInfo->invertible = false;
4906 4907 4908 4909 4910
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
  pInfo->delIndex = 0;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
  initResultRowInfo(&pInfo->binfo.resultRowInfo);

4911 4912 4913 4914 4915 4916 4917 4918 4919 4920
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
  *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
  streamStateSetNumber(pInfo->pState, -1);

  pInfo->pPhyNode = NULL;  // create new child
  pInfo->pPullDataMap = NULL;
  pInfo->pPullWins = NULL;  // SPullWindowInfo
  pInfo->pullIndex = 0;
  pInfo->pPullDataRes = NULL;
  pInfo->isFinal = false;
L
liuyao 已提交
4921
  pInfo->numOfChild = 0;
4922 4923
  pInfo->delKey.ts = INT64_MAX;
  pInfo->delKey.groupId = 0;
5
54liuyao 已提交
4924
  pInfo->numOfDatapack = 0;
5
54liuyao 已提交
4925 4926
  pInfo->pUpdated = NULL;
  pInfo->pUpdatedMap = NULL;
dengyihao's avatar
dengyihao 已提交
4927 4928
  pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize,
                                                  compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
4929

L
Liu Jicong 已提交
4930 4931
  setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
4932 4933
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL,
                                         destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
4934

L
liuyao 已提交
4935
  initIntervalDownStream(downstream, pPhyNode->type, pInfo);
4936 4937 4938 4939 4940 4941 4942 4943
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
4944
  destroyStreamFinalIntervalOperatorInfo(pInfo);
4945 4946 4947 4948
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}