timesliceoperator.c 34.3 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
#include "executorInt.h"
H
Haojun Liao 已提交
16 17 18
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
19 20
#include "operator.h"
#include "querytask.h"
21
#include "storageapi.h"
H
Haojun Liao 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"

typedef struct STimeSliceOperatorInfo {
  SSDataBlock*         pRes;
  STimeWindow          win;
  SInterval            interval;
  int64_t              current;
  SArray*              pPrevRow;     // SArray<SGroupValue>
  SArray*              pNextRow;     // SArray<SGroupValue>
  SArray*              pLinearInfo;  // SArray<SFillLinearInfo>
  bool                 isPrevRowSet;
  bool                 isNextRowSet;
  int32_t              fillType;      // fill type
  SColumn              tsCol;         // primary timestamp column
  SExprSupp            scalarSup;     // scalar calculation
  struct SFillColInfo* pFillColInfo;  // fill column info
G
Ganlin Zhao 已提交
42 43
  int64_t              prevTs;
  bool                 prevTsSet;
G
Ganlin Zhao 已提交
44
  uint64_t             groupId;
G
Ganlin Zhao 已提交
45
  SGroupKeys*          pPrevGroupKey;
G
Ganlin Zhao 已提交
46
  SSDataBlock*         pNextGroupRes;
47
  SSDataBlock*         pRemainRes;    // save block unfinished processing
G
Ganlin Zhao 已提交
48
  int32_t              remainIndex;     // the remaining index in the block to be processed
H
Haojun Liao 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61
} STimeSliceOperatorInfo;

static void destroyTimeSliceOperatorInfo(void* param);

static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
      pkey->isNull = false;
      char* val = colDataGetData(pColInfoData, rowIndex);
H
Haojun Liao 已提交
62
      if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
63
        memcpy(pkey->pData, val, varDataLen(val));
H
Haojun Liao 已提交
64 65
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
H
Haojun Liao 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
      }
    } else {
      pkey->isNull = true;
    }
  }

  pSliceInfo->isPrevRowSet = true;
}

static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i);
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
      pkey->isNull = false;
      char* val = colDataGetData(pColInfoData, rowIndex);
      if (!IS_VAR_DATA_TYPE(pkey->type)) {
        memcpy(pkey->pData, val, pkey->bytes);
      } else {
        memcpy(pkey->pData, val, varDataLen(val));
      }
    } else {
      pkey->isNull = true;
    }
  }

  pSliceInfo->isNextRowSet = true;
}

static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
    SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i);

104 105 106 107
    if (!IS_MATHABLE_TYPE(pColInfoData->info.type)) {
      continue;
    }

H
Haojun Liao 已提交
108 109 110 111 112
    // null value is represented by using key = INT64_MIN for now.
    // TODO: optimize to ignore null values for linear interpolation.
    if (!pLinearInfo->isStartSet) {
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
        pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
113 114 115 116 117 118 119
        char* p = colDataGetData(pColInfoData, rowIndex);
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
          memcpy(pLinearInfo->start.val, p, varDataTLen(p));
        } else {
          memcpy(pLinearInfo->start.val, p, pLinearInfo->bytes);
        }
H
Haojun Liao 已提交
120 121 122 123 124
      }
      pLinearInfo->isStartSet = true;
    } else if (!pLinearInfo->isEndSet) {
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
125 126 127 128

        char* p = colDataGetData(pColInfoData, rowIndex);
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
129
          memcpy(pLinearInfo->end.val, p, varDataTLen(p));
130
        } else {
H
Haojun Liao 已提交
131
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
132
        }
H
Haojun Liao 已提交
133 134 135 136 137 138 139 140
      }
      pLinearInfo->isEndSet = true;
    } else {
      pLinearInfo->start.key = pLinearInfo->end.key;
      memcpy(pLinearInfo->start.val, pLinearInfo->end.val, pLinearInfo->bytes);

      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
141 142 143 144

        char* p = colDataGetData(pColInfoData, rowIndex);
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
145
          memcpy(pLinearInfo->end.val, p, varDataTLen(p));
146
        } else {
H
Haojun Liao 已提交
147
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
148 149
        }

H
Haojun Liao 已提交
150 151 152 153 154 155 156
      } else {
        pLinearInfo->end.key = INT64_MIN;
      }
    }
  }
}

D
dapan1121 已提交
157 158 159 160 161 162 163 164 165 166 167 168
static FORCE_INLINE int32_t timeSliceEnsureBlockCapacity(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock) {
  if (pBlock->info.rows < pBlock->info.capacity) {
    return TSDB_CODE_SUCCESS;
  }

  uint32_t winNum = (pSliceInfo->win.ekey - pSliceInfo->win.skey) / pSliceInfo->interval.interval;
  uint32_t newRowsNum = pBlock->info.rows + TMIN(winNum / 4 + 1, 1048576);
  blockDataEnsureCapacity(pBlock, newRowsNum);

  return TSDB_CODE_SUCCESS;
}

169 170 171 172 173 174 175 176 177 178
static bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) {
  char *name = pExprInfo->pExpr->_function.functionName;
  return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts") == 0);
}

static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
  char *name = pExprInfo->pExpr->_function.functionName;
  return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
}

G
Ganlin Zhao 已提交
179 180 181 182 183
static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol,
                                     int32_t curIndex, int32_t rows) {


  int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex);
G
Ganlin Zhao 已提交
184 185 186 187
  if (currentTs > pSliceInfo->win.ekey) {
    return false;
  }

G
Ganlin Zhao 已提交
188 189 190 191 192 193 194
  if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) {
    return true;
  }

  pSliceInfo->prevTsSet = true;
  pSliceInfo->prevTs = currentTs;

G
Ganlin Zhao 已提交
195
  if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) {
G
Ganlin Zhao 已提交
196 197 198 199 200 201 202 203 204
    int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
    if (currentTs == nextTs) {
      return true;
    }
  }

  return false;
}

G
Ganlin Zhao 已提交
205
static bool isInterpFunc(SExprInfo* pExprInfo) {
206 207
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
  return (functionType == FUNCTION_TYPE_INTERP);
G
Ganlin Zhao 已提交
208 209
}

G
Ganlin Zhao 已提交
210
static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
211 212
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
  return (functionType == FUNCTION_TYPE_GROUP_KEY);
G
Ganlin Zhao 已提交
213 214
}

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
static bool getIgoreNullRes(SExprSupp* pExprSup) {
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];

    if (isInterpFunc(pExprInfo)) {
      for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
        SFunctParam *pFuncParam = &pExprInfo->base.pParam[j];
        if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
          return pFuncParam->param.i ? true : false;
        }
      }
    }
  }

  return false;
}

static bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull) {
  if (!ignoreNull) {
    return false;
  }

  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

    if (isInterpFunc(pExprInfo)) {
      int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);

      if (colDataIsNull_s(pSrc, index)) {
        return true;
      }
    }
  }

  return false;
}


X
Xiaoyu Wang 已提交
254
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
G
Ganlin Zhao 已提交
255
                                   SSDataBlock* pSrcBlock, int32_t index, bool beforeTs) {
H
Haojun Liao 已提交
256
  int32_t rows = pResBlock->info.rows;
D
dapan1121 已提交
257
  timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
H
Haojun Liao 已提交
258 259
  // todo set the correct primary timestamp column

G
Ganlin Zhao 已提交
260

H
Haojun Liao 已提交
261
  // output the result
G
Ganlin Zhao 已提交
262 263
  int32_t fillColIndex = 0;
  bool       hasInterp = true;
H
Haojun Liao 已提交
264 265 266
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

G
Ganlin Zhao 已提交
267
    int32_t       dstSlot = pExprInfo->base.resSchema.slotId;
H
Haojun Liao 已提交
268 269
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);

270
    if (isIrowtsPseudoColumn(pExprInfo)) {
271
      colDataSetVal(pDst, rows, (char*)&pSliceInfo->current, false);
H
Haojun Liao 已提交
272
      continue;
273
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
274 275 276
      bool isFilled = true;
      colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false);
      continue;
G
Ganlin Zhao 已提交
277
    } else if (!isInterpFunc(pExprInfo)) {
G
Ganlin Zhao 已提交
278
      if (isGroupKeyFunc(pExprInfo)) {
G
Ganlin Zhao 已提交
279 280 281
        if (pSrcBlock != NULL) {
          int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
          SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
G
Ganlin Zhao 已提交
282

G
Ganlin Zhao 已提交
283 284 285 286
          if (colDataIsNull_s(pSrc, index)) {
            colDataSetNULL(pDst, pResBlock->info.rows);
            continue;
          }
G
Ganlin Zhao 已提交
287

G
Ganlin Zhao 已提交
288 289 290 291 292 293 294 295 296 297 298
          char* v = colDataGetData(pSrc, index);
          colDataSetVal(pDst, pResBlock->info.rows, v, false);
        } else {
          // use stored group key
          SGroupKeys* pkey = pSliceInfo->pPrevGroupKey;
          if (pkey->isNull == false) {
            colDataSetVal(pDst, rows, pkey->pData, false);
          } else {
            colDataSetNULL(pDst, rows);
          }
        }
G
Ganlin Zhao 已提交
299
      }
G
Ganlin Zhao 已提交
300
      continue;
H
Haojun Liao 已提交
301 302 303 304
    }

    int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
    switch (pSliceInfo->fillType) {
D
dapan1121 已提交
305 306
      case TSDB_FILL_NULL:
      case TSDB_FILL_NULL_F: {
307
        colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
308 309 310
        break;
      }

D
dapan1121 已提交
311 312
      case TSDB_FILL_SET_VALUE:
      case TSDB_FILL_SET_VALUE_F: {
G
Ganlin Zhao 已提交
313
        SVariant* pVar = &pSliceInfo->pFillColInfo[fillColIndex].fillVal;
H
Haojun Liao 已提交
314 315 316

        if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
          float v = 0;
G
Ganlin Zhao 已提交
317 318 319 320 321
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
            GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
          } else {
            v = taosStr2Float(varDataVal(pVar->pz), NULL);
          }
322
          colDataSetVal(pDst, rows, (char*)&v, false);
H
Haojun Liao 已提交
323 324
        } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
          double v = 0;
G
Ganlin Zhao 已提交
325 326 327 328 329
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
            GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
          } else {
            v = taosStr2Double(varDataVal(pVar->pz), NULL);
          }
330
          colDataSetVal(pDst, rows, (char*)&v, false);
H
Haojun Liao 已提交
331 332
        } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
          int64_t v = 0;
G
Ganlin Zhao 已提交
333 334 335 336 337
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
            GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
          } else {
            v = taosStr2int64(varDataVal(pVar->pz));
          }
338
          colDataSetVal(pDst, rows, (char*)&v, false);
339 340 341 342 343 344 345 346
        } else if (IS_BOOLEAN_TYPE(pDst->info.type)) {
          bool v = false;
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
            GET_TYPED_DATA(v, bool, pVar->nType, &pVar->i);
          } else {
            v = taosStr2Int8(varDataVal(pVar->pz), NULL, 10);
          }
          colDataSetVal(pDst, rows, (char*)&v, false);
H
Haojun Liao 已提交
347
        }
G
Ganlin Zhao 已提交
348 349

        ++fillColIndex;
H
Haojun Liao 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
        break;
      }

      case TSDB_FILL_LINEAR: {
        SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot);

        SPoint start = pLinearInfo->start;
        SPoint end = pLinearInfo->end;
        SPoint current = {.key = pSliceInfo->current};

        // do not interpolate before ts range, only increate pSliceInfo->current
        if (beforeTs && !pLinearInfo->isEndSet) {
          return true;
        }

        if (!pLinearInfo->isStartSet || !pLinearInfo->isEndSet) {
          hasInterp = false;
          break;
        }

370 371 372 373 374
        if (end.key != INT64_MIN && end.key < pSliceInfo->current) {
          hasInterp = false;
          break;
        }

H
Haojun Liao 已提交
375
        if (start.key == INT64_MIN || end.key == INT64_MIN) {
376
          colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
377 378 379 380 381
          break;
        }

        current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
        taosGetLinearInterpolationVal(&current, pLinearInfo->type, &start, &end, pLinearInfo->type);
382
        colDataSetVal(pDst, rows, (char*)current.val, false);
H
Haojun Liao 已提交
383 384 385 386 387 388 389 390 391 392 393 394

        taosMemoryFree(current.val);
        break;
      }
      case TSDB_FILL_PREV: {
        if (!pSliceInfo->isPrevRowSet) {
          hasInterp = false;
          break;
        }

        SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
        if (pkey->isNull == false) {
395
          colDataSetVal(pDst, rows, pkey->pData, false);
H
Haojun Liao 已提交
396
        } else {
397
          colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
398 399 400 401 402 403 404 405 406 407 408 409
        }
        break;
      }

      case TSDB_FILL_NEXT: {
        if (!pSliceInfo->isNextRowSet) {
          hasInterp = false;
          break;
        }

        SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
        if (pkey->isNull == false) {
410
          colDataSetVal(pDst, rows, pkey->pData, false);
H
Haojun Liao 已提交
411
        } else {
412
          colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
        }
        break;
      }

      case TSDB_FILL_NONE:
      default:
        break;
    }
  }

  if (hasInterp) {
    pResBlock->info.rows += 1;
  }

  return hasInterp;
}

static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
                                  SSDataBlock* pSrcBlock, int32_t index) {
D
dapan1121 已提交
432
  timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
H
Haojun Liao 已提交
433 434 435 436 437 438
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

    int32_t          dstSlot = pExprInfo->base.resSchema.slotId;
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);

439
    if (isIrowtsPseudoColumn(pExprInfo)) {
440
      colDataSetVal(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
441
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
442
      bool isFilled = false;
X
Xiaoyu Wang 已提交
443
      colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
H
Haojun Liao 已提交
444
    } else {
G
Ganlin Zhao 已提交
445
      int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
H
Haojun Liao 已提交
446 447 448
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);

      if (colDataIsNull_s(pSrc, index)) {
449
        colDataSetNULL(pDst, pResBlock->info.rows);
H
Haojun Liao 已提交
450 451 452 453
        continue;
      }

      char* v = colDataGetData(pSrc, index);
454
      colDataSetVal(pDst, pResBlock->info.rows, v, false);
H
Haojun Liao 已提交
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
    }
  }

  pResBlock->info.rows += 1;
  return;
}

static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
  if (pInfo->pPrevRow != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
  if (pInfo->pPrevRow == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys key = {0};
    key.bytes = pColInfo->info.bytes;
    key.type = pColInfo->info.type;
    key.isNull = false;
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
    taosArrayPush(pInfo->pPrevRow, &key);
  }

  pInfo->isPrevRowSet = false;

  return TSDB_CODE_SUCCESS;
}

static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
  if (pInfo->pNextRow != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys));
  if (pInfo->pNextRow == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys key = {0};
    key.bytes = pColInfo->info.bytes;
    key.type = pColInfo->info.type;
    key.isNull = false;
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
    taosArrayPush(pInfo->pNextRow, &key);
  }

  pInfo->isNextRowSet = false;

  return TSDB_CODE_SUCCESS;
}

static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
  if (pInfo->pLinearInfo != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo));
  if (pInfo->pLinearInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SFillLinearInfo linearInfo = {0};
    linearInfo.start.key = INT64_MIN;
    linearInfo.end.key = INT64_MIN;
    linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
    linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
    linearInfo.isStartSet = false;
    linearInfo.isEndSet = false;
    linearInfo.type = pColInfo->info.type;
    linearInfo.bytes = pColInfo->info.bytes;
    taosArrayPush(pInfo->pLinearInfo, &linearInfo);
  }

  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) {
  if (pInfo->pPrevGroupKey != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys));
  if (pInfo->pPrevGroupKey == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];

    if (isGroupKeyFunc(pExprInfo)) {
      pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes;
      pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type;
      pInfo->pPrevGroupKey->isNull = false;
      pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) {
H
Haojun Liao 已提交
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585
  int32_t code;
  code = initPrevRowsKeeper(pInfo, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

  code = initNextRowsKeeper(pInfo, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

  code = initFillLinearInfo(pInfo, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

G
Ganlin Zhao 已提交
586 587 588 589 590 591
  code = initGroupKeyKeeper(pInfo, pExprSup);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }


H
Haojun Liao 已提交
592 593 594
  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
595 596 597
static int32_t resetPrevRowsKeeper(STimeSliceOperatorInfo* pInfo) {
  if (pInfo->pPrevRow == NULL) {
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
598 599
  }

G
Ganlin Zhao 已提交
600 601 602 603
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
    pKey->isNull = false;
  }
H
Haojun Liao 已提交
604

G
Ganlin Zhao 已提交
605
  pInfo->isPrevRowSet = false;
H
Haojun Liao 已提交
606

G
Ganlin Zhao 已提交
607 608
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
609

G
Ganlin Zhao 已提交
610 611 612 613
static int32_t resetNextRowsKeeper(STimeSliceOperatorInfo* pInfo) {
  if (pInfo->pNextRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
614

G
Ganlin Zhao 已提交
615 616 617 618 619 620
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
    pKey->isNull = false;
  }

  pInfo->isNextRowSet = false;
H
Haojun Liao 已提交
621

G
Ganlin Zhao 已提交
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
  return TSDB_CODE_SUCCESS;
}

static int32_t resetFillLinearInfo(STimeSliceOperatorInfo* pInfo) {
  if (pInfo->pLinearInfo == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SFillLinearInfo *pLinearInfo = taosArrayGet(pInfo->pLinearInfo, i);
    pLinearInfo->start.key = INT64_MIN;
    pLinearInfo->end.key = INT64_MIN;
    pLinearInfo->isStartSet = false;
    pLinearInfo->isEndSet = false;
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
  resetPrevRowsKeeper(pInfo);
  resetNextRowsKeeper(pInfo);
  resetFillLinearInfo(pInfo);

  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
649
static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, int32_t threshold) {
650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665
  SSDataBlock* pResBlock = pSliceInfo->pRes;
  if (pResBlock->info.rows > threshold) {
    return true;
  }

  return false;
}

static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) {
  if (pSliceInfo->current > pSliceInfo->win.ekey) {
    return true;
  }

  return false;
}

G
Ganlin Zhao 已提交
666 667 668 669 670 671
static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t curIndex) {
  SSDataBlock* pResBlock = pSliceInfo->pRes;

  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
  if (curIndex < pBlock->info.rows - 1) {
    pSliceInfo->pRemainRes = pBlock;
G
Ganlin Zhao 已提交
672 673
    pSliceInfo->remainIndex = curIndex + 1;
    return;
G
Ganlin Zhao 已提交
674
  }
G
Ganlin Zhao 已提交
675 676 677 678

  // all data in remaining block processed
  pSliceInfo->pRemainRes = NULL;

G
Ganlin Zhao 已提交
679 680
}

G
Ganlin Zhao 已提交
681
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
682
                            SExecTaskInfo* pTaskInfo, bool ignoreNull) {
G
Ganlin Zhao 已提交
683 684 685 686 687
  SSDataBlock* pResBlock = pSliceInfo->pRes;
  SInterval*   pInterval = &pSliceInfo->interval;

  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);

G
Ganlin Zhao 已提交
688 689 690
  int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex;
  for (; i < pBlock->info.rows; ++i) {
    int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
G
Ganlin Zhao 已提交
691

G
Ganlin Zhao 已提交
692 693 694
    // check for duplicate timestamps
    if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
695 696
    }

697 698 699
    if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
      continue;
    }
H
Haojun Liao 已提交
700

G
Ganlin Zhao 已提交
701 702
    if (ts == pSliceInfo->current) {
      addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
H
Haojun Liao 已提交
703

G
Ganlin Zhao 已提交
704 705
      doKeepPrevRows(pSliceInfo, pBlock, i);
      doKeepLinearInfo(pSliceInfo, pBlock, i);
H
Haojun Liao 已提交
706

G
Ganlin Zhao 已提交
707 708
      pSliceInfo->current =
          taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
709 710

      if (checkWindowBoundReached(pSliceInfo)) {
H
Haojun Liao 已提交
711 712
        break;
      }
G
Ganlin Zhao 已提交
713 714
      if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
        saveBlockStatus(pSliceInfo, pBlock, i);
715 716
        return;
      }
G
Ganlin Zhao 已提交
717 718 719 720
    } else if (ts < pSliceInfo->current) {
      // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
      doKeepPrevRows(pSliceInfo, pBlock, i);
      doKeepLinearInfo(pSliceInfo, pBlock, i);
H
Haojun Liao 已提交
721

G
Ganlin Zhao 已提交
722 723 724 725 726 727
      if (i < pBlock->info.rows - 1) {
        // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
        doKeepNextRows(pSliceInfo, pBlock, i + 1);
        int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
        if (nextTs > pSliceInfo->current) {
          while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
G
Ganlin Zhao 已提交
728
            if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false) &&
G
Ganlin Zhao 已提交
729
                pSliceInfo->fillType == TSDB_FILL_LINEAR) {
H
Haojun Liao 已提交
730
              break;
G
Ganlin Zhao 已提交
731 732 733
            } else {
              pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
                                                pInterval->precision);
H
Haojun Liao 已提交
734 735 736
            }
          }

737
          if (checkWindowBoundReached(pSliceInfo)) {
H
Haojun Liao 已提交
738 739
            break;
          }
G
Ganlin Zhao 已提交
740 741
          if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
            saveBlockStatus(pSliceInfo, pBlock, i);
742 743
            return;
          }
G
Ganlin Zhao 已提交
744 745
        } else {
          // ignore current row, and do nothing
H
Haojun Liao 已提交
746
        }
G
Ganlin Zhao 已提交
747 748 749 750 751 752 753 754 755
      } else {  // it is the last row of current block
        doKeepPrevRows(pSliceInfo, pBlock, i);
      }
    } else {  // ts > pSliceInfo->current
      // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
      doKeepNextRows(pSliceInfo, pBlock, i);
      doKeepLinearInfo(pSliceInfo, pBlock, i);

      while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
G
Ganlin Zhao 已提交
756
        if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true) &&
G
Ganlin Zhao 已提交
757 758 759
            pSliceInfo->fillType == TSDB_FILL_LINEAR) {
          break;
        } else {
H
Haojun Liao 已提交
760 761 762
          pSliceInfo->current =
              taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
        }
G
Ganlin Zhao 已提交
763
      }
H
Haojun Liao 已提交
764

G
Ganlin Zhao 已提交
765 766 767 768 769 770 771 772 773
      // add current row if timestamp match
      if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
        addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);

        pSliceInfo->current =
            taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
      }
      doKeepPrevRows(pSliceInfo, pBlock, i);

774
      if (checkWindowBoundReached(pSliceInfo)) {
G
Ganlin Zhao 已提交
775
        break;
H
Haojun Liao 已提交
776
      }
G
Ganlin Zhao 已提交
777 778
      if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
        saveBlockStatus(pSliceInfo, pBlock, i);
779 780
        return;
      }
H
Haojun Liao 已提交
781 782
    }
  }
783 784 785 786 787

  // if reached here, meaning block processing finished naturally,
  // or interpolation reach window upper bound
  pSliceInfo->pRemainRes = NULL;

G
Ganlin Zhao 已提交
788 789
}

G
Ganlin Zhao 已提交
790
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
G
Ganlin Zhao 已提交
791 792
  SSDataBlock* pResBlock = pSliceInfo->pRes;
  SInterval*   pInterval = &pSliceInfo->interval;
H
Haojun Liao 已提交
793 794 795

  while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
         pSliceInfo->fillType != TSDB_FILL_LINEAR) {
G
Ganlin Zhao 已提交
796
    genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false);
H
Haojun Liao 已提交
797 798 799
    pSliceInfo->current =
        taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
  }
G
Ganlin Zhao 已提交
800 801
}

G
Ganlin Zhao 已提交
802 803 804 805 806 807 808 809 810 811 812 813
static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) {
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

    if (isGroupKeyFunc(pExprInfo)) {
      int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);

      if (colDataIsNull_s(pSrc, 0)) {
        pGroupKey->isNull = true;
        break;
      }
H
Haojun Liao 已提交
814

G
Ganlin Zhao 已提交
815 816 817 818 819 820 821 822 823 824 825
      char* v = colDataGetData(pSrc, 0);
      if (IS_VAR_DATA_TYPE(pGroupKey->type)) {
        memcpy(pGroupKey->pData, v, varDataTLen(v));
      } else {
        memcpy(pGroupKey->pData, v, pGroupKey->bytes);
      }

      pGroupKey->isNull = false;
      break;
    }
  }
G
Ganlin Zhao 已提交
826 827
}

G
Ganlin Zhao 已提交
828
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
G
Ganlin Zhao 已提交
829 830
  pSliceInfo->current = pSliceInfo->win.skey;
  pSliceInfo->prevTsSet = false;
G
Ganlin Zhao 已提交
831
  resetKeeperInfo(pSliceInfo);
G
Ganlin Zhao 已提交
832 833
}

834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857
static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
  SExprSupp*              pSup = &pOperator->exprSupp;
  bool                    ignoreNull = getIgoreNullRes(pSup);
  int32_t                 order = TSDB_ORDER_ASC;

  int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
  if (code != TSDB_CODE_SUCCESS) {
    T_LONG_JMP(pTaskInfo->env, code);
  }

  if (pSliceInfo->scalarSup.pExprInfo != NULL) {
    SExprSupp* pExprSup = &pSliceInfo->scalarSup;
    projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
  }

  // the pDataBlock are always the same one, no need to call this again
  setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
  doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
  copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
}

H
Haojun Liao 已提交
858 859 860 861 862
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

863
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
864 865 866 867 868 869 870
  STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
  SSDataBlock*            pResBlock = pSliceInfo->pRes;

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  blockDataCleanup(pResBlock);

  while (1) {
G
Ganlin Zhao 已提交
871
    if (pSliceInfo->pNextGroupRes != NULL) {
872
      doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes);
G
Ganlin Zhao 已提交
873
      if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
874
        doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
G
Ganlin Zhao 已提交
875 876 877
        if (pSliceInfo->pRemainRes == NULL) {
          pSliceInfo->pNextGroupRes = NULL;
        }
878 879
        goto _finished;
      }
G
Ganlin Zhao 已提交
880
      pSliceInfo->pNextGroupRes = NULL;
H
Haojun Liao 已提交
881 882
    }

G
Ganlin Zhao 已提交
883
    while (1) {
884
      SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : downstream->fpSet.getNextFn(downstream);
G
Ganlin Zhao 已提交
885
      if (pBlock == NULL) {
G
Ganlin Zhao 已提交
886
        setOperatorCompleted(pOperator);
G
Ganlin Zhao 已提交
887 888
        break;
      }
889

G
Ganlin Zhao 已提交
890 891 892 893 894 895
      if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
        pSliceInfo->groupId = pBlock->info.id.groupId;
      } else {
        if (pSliceInfo->groupId != pBlock->info.id.groupId) {
          pSliceInfo->groupId = pBlock->info.id.groupId;
          pSliceInfo->pNextGroupRes = pBlock;
G
Ganlin Zhao 已提交
896
          break;
G
Ganlin Zhao 已提交
897 898 899
        }
      }

900
      doHandleTimeslice(pOperator, pBlock);
G
Ganlin Zhao 已提交
901
      if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
902 903
        doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
        goto _finished;
G
Ganlin Zhao 已提交
904
      }
H
Haojun Liao 已提交
905
    }
G
Ganlin Zhao 已提交
906
    // post work for a specific group
H
Haojun Liao 已提交
907

G
Ganlin Zhao 已提交
908 909
    // check if need to interpolate after last datablock
    // except for fill(next), fill(linear)
G
Ganlin Zhao 已提交
910
    genInterpAfterDataBlock(pSliceInfo, pOperator, 0);
H
Haojun Liao 已提交
911

G
Ganlin Zhao 已提交
912 913 914 915
    doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
    if (pOperator->status == OP_EXEC_DONE) {
      break;
    }
G
Ganlin Zhao 已提交
916 917

    // restore initial value for next group
G
Ganlin Zhao 已提交
918
    resetTimesliceInfo(pSliceInfo);
G
Ganlin Zhao 已提交
919 920 921
    if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
      goto _finished;
    }
G
Ganlin Zhao 已提交
922
  }
923

924
_finished:
H
Haojun Liao 已提交
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945
  // restore the value
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
  if (pResBlock->info.rows == 0) {
    pOperator->status = OP_EXEC_DONE;
  }

  return pResBlock->info.rows == 0 ? NULL : pResBlock;
}

SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
  STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pOperator == NULL || pInfo == NULL) {
    goto _error;
  }

  SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode;
  SExprSupp*            pSup = &pOperator->exprSupp;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
946
  int32_t    code = initExprSupp(pSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
H
Haojun Liao 已提交
947 948 949 950 951 952 953
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (pInterpPhyNode->pExprs != NULL) {
    int32_t    num = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
954
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
H
Haojun Liao 已提交
955 956 957 958 959
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

960 961 962 963 964
  code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
965 966 967 968 969 970 971 972 973 974
  pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
  pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
  initResultSizeInfo(&pOperator->resultInfo, 4096);

  pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
  pInfo->pLinearInfo = NULL;
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
  pInfo->win = pInterpPhyNode->timeRange;
  pInfo->interval.interval = pInterpPhyNode->interval;
  pInfo->current = pInfo->win.skey;
G
Ganlin Zhao 已提交
975 976
  pInfo->prevTsSet = false;
  pInfo->prevTs = 0;
G
Ganlin Zhao 已提交
977
  pInfo->groupId = 0;
G
Ganlin Zhao 已提交
978
  pInfo->pPrevGroupKey = NULL;
G
Ganlin Zhao 已提交
979
  pInfo->pNextGroupRes = NULL;
980
  pInfo->pRemainRes = NULL;
G
Ganlin Zhao 已提交
981
  pInfo->remainIndex = 0;
H
Haojun Liao 已提交
982 983 984 985 986 987 988 989 990

  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
    pScanInfo->base.cond.twindows = pInfo->win;
    pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL;
  }

  setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
X
Xiaoyu Wang 已提交
991 992
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
993 994 995 996 997 998

  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

X
Xiaoyu Wang 已提交
999
_error:
H
Haojun Liao 已提交
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

void destroyTimeSliceOperatorInfo(void* param) {
  STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;

  pInfo->pRes = blockDataDestroy(pInfo->pRes);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
    taosMemoryFree(pKey->pData);
  }
  taosArrayDestroy(pInfo->pPrevRow);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) {
    SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i);
    taosMemoryFree(pKey->pData);
  }
  taosArrayDestroy(pInfo->pNextRow);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i);
    taosMemoryFree(pKey->start.val);
    taosMemoryFree(pKey->end.val);
  }
  taosArrayDestroy(pInfo->pLinearInfo);
G
Ganlin Zhao 已提交
1029

1030 1031 1032 1033
  if (pInfo->pPrevGroupKey) {
    taosMemoryFree(pInfo->pPrevGroupKey->pData);
    taosMemoryFree(pInfo->pPrevGroupKey);
  }
G
Ganlin Zhao 已提交
1034

D
dapan1121 已提交
1035
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
1036

G
Ganlin Zhao 已提交
1037 1038 1039
  for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
    taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal);
  }
H
Haojun Liao 已提交
1040 1041 1042
  taosMemoryFree(pInfo->pFillColInfo);
  taosMemoryFreeClear(param);
}