timesliceoperator.c 35.0 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
#include "executorInt.h"
H
Haojun Liao 已提交
16 17 18
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
19 20
#include "operator.h"
#include "querytask.h"
21
#include "storageapi.h"
H
Haojun Liao 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"

typedef struct STimeSliceOperatorInfo {
  SSDataBlock*         pRes;
  STimeWindow          win;
  SInterval            interval;
  int64_t              current;
  SArray*              pPrevRow;     // SArray<SGroupValue>
  SArray*              pNextRow;     // SArray<SGroupValue>
  SArray*              pLinearInfo;  // SArray<SFillLinearInfo>
  bool                 isPrevRowSet;
  bool                 isNextRowSet;
  int32_t              fillType;      // fill type
  SColumn              tsCol;         // primary timestamp column
  SExprSupp            scalarSup;     // scalar calculation
  struct SFillColInfo* pFillColInfo;  // fill column info
G
Ganlin Zhao 已提交
42 43
  int64_t              prevTs;
  bool                 prevTsSet;
G
Ganlin Zhao 已提交
44
  uint64_t             groupId;
G
Ganlin Zhao 已提交
45
  SGroupKeys*          pPrevGroupKey;
G
Ganlin Zhao 已提交
46
  SSDataBlock*         pNextGroupRes;
47
  SSDataBlock*         pRemainRes;    // save block unfinished processing
G
Ganlin Zhao 已提交
48
  int32_t              remainIndex;     // the remaining index in the block to be processed
H
Haojun Liao 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61
} STimeSliceOperatorInfo;

static void destroyTimeSliceOperatorInfo(void* param);

static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
      pkey->isNull = false;
      char* val = colDataGetData(pColInfoData, rowIndex);
H
Haojun Liao 已提交
62
      if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
63
        memcpy(pkey->pData, val, varDataLen(val));
H
Haojun Liao 已提交
64 65
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
H
Haojun Liao 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
      }
    } else {
      pkey->isNull = true;
    }
  }

  pSliceInfo->isPrevRowSet = true;
}

static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i);
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
      pkey->isNull = false;
      char* val = colDataGetData(pColInfoData, rowIndex);
      if (!IS_VAR_DATA_TYPE(pkey->type)) {
        memcpy(pkey->pData, val, pkey->bytes);
      } else {
        memcpy(pkey->pData, val, varDataLen(val));
      }
    } else {
      pkey->isNull = true;
    }
  }

  pSliceInfo->isNextRowSet = true;
}

static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
    SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i);

104 105 106 107
    if (!IS_MATHABLE_TYPE(pColInfoData->info.type)) {
      continue;
    }

H
Haojun Liao 已提交
108 109 110 111 112
    // null value is represented by using key = INT64_MIN for now.
    // TODO: optimize to ignore null values for linear interpolation.
    if (!pLinearInfo->isStartSet) {
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
        pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
113 114 115 116 117 118 119
        char* p = colDataGetData(pColInfoData, rowIndex);
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
          memcpy(pLinearInfo->start.val, p, varDataTLen(p));
        } else {
          memcpy(pLinearInfo->start.val, p, pLinearInfo->bytes);
        }
H
Haojun Liao 已提交
120 121 122 123 124
      }
      pLinearInfo->isStartSet = true;
    } else if (!pLinearInfo->isEndSet) {
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
125 126 127 128

        char* p = colDataGetData(pColInfoData, rowIndex);
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
129
          memcpy(pLinearInfo->end.val, p, varDataTLen(p));
130
        } else {
H
Haojun Liao 已提交
131
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
132
        }
H
Haojun Liao 已提交
133 134 135 136 137 138 139 140
      }
      pLinearInfo->isEndSet = true;
    } else {
      pLinearInfo->start.key = pLinearInfo->end.key;
      memcpy(pLinearInfo->start.val, pLinearInfo->end.val, pLinearInfo->bytes);

      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
141 142 143 144

        char* p = colDataGetData(pColInfoData, rowIndex);
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
145
          memcpy(pLinearInfo->end.val, p, varDataTLen(p));
146
        } else {
H
Haojun Liao 已提交
147
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
148 149
        }

H
Haojun Liao 已提交
150 151 152 153 154 155 156
      } else {
        pLinearInfo->end.key = INT64_MIN;
      }
    }
  }
}

D
dapan1121 已提交
157 158 159 160 161 162 163 164 165 166 167 168
static FORCE_INLINE int32_t timeSliceEnsureBlockCapacity(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock) {
  if (pBlock->info.rows < pBlock->info.capacity) {
    return TSDB_CODE_SUCCESS;
  }

  uint32_t winNum = (pSliceInfo->win.ekey - pSliceInfo->win.skey) / pSliceInfo->interval.interval;
  uint32_t newRowsNum = pBlock->info.rows + TMIN(winNum / 4 + 1, 1048576);
  blockDataEnsureCapacity(pBlock, newRowsNum);

  return TSDB_CODE_SUCCESS;
}

169 170 171 172 173 174 175 176 177 178
static bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) {
  char *name = pExprInfo->pExpr->_function.functionName;
  return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts") == 0);
}

static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
  char *name = pExprInfo->pExpr->_function.functionName;
  return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
}

G
Ganlin Zhao 已提交
179 180 181 182 183
static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol,
                                     int32_t curIndex, int32_t rows) {


  int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex);
G
Ganlin Zhao 已提交
184 185 186 187
  if (currentTs > pSliceInfo->win.ekey) {
    return false;
  }

G
Ganlin Zhao 已提交
188 189 190 191 192 193 194
  if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) {
    return true;
  }

  pSliceInfo->prevTsSet = true;
  pSliceInfo->prevTs = currentTs;

G
Ganlin Zhao 已提交
195
  if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) {
G
Ganlin Zhao 已提交
196 197 198 199 200 201 202 203 204
    int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
    if (currentTs == nextTs) {
      return true;
    }
  }

  return false;
}

G
Ganlin Zhao 已提交
205
static bool isInterpFunc(SExprInfo* pExprInfo) {
206 207
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
  return (functionType == FUNCTION_TYPE_INTERP);
G
Ganlin Zhao 已提交
208 209
}

G
Ganlin Zhao 已提交
210
static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
211 212
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
  return (functionType == FUNCTION_TYPE_GROUP_KEY);
G
Ganlin Zhao 已提交
213 214
}

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
static bool getIgoreNullRes(SExprSupp* pExprSup) {
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];

    if (isInterpFunc(pExprInfo)) {
      for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
        SFunctParam *pFuncParam = &pExprInfo->base.pParam[j];
        if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
          return pFuncParam->param.i ? true : false;
        }
      }
    }
  }

  return false;
}

static bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull) {
  if (!ignoreNull) {
    return false;
  }

  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

    if (isInterpFunc(pExprInfo)) {
      int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);

      if (colDataIsNull_s(pSrc, index)) {
        return true;
      }
    }
  }

  return false;
}


X
Xiaoyu Wang 已提交
254
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
G
Ganlin Zhao 已提交
255
                                   SSDataBlock* pSrcBlock, int32_t index, bool beforeTs) {
H
Haojun Liao 已提交
256
  int32_t rows = pResBlock->info.rows;
D
dapan1121 已提交
257
  timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
H
Haojun Liao 已提交
258 259
  // todo set the correct primary timestamp column

G
Ganlin Zhao 已提交
260

H
Haojun Liao 已提交
261
  // output the result
G
Ganlin Zhao 已提交
262 263
  int32_t fillColIndex = 0;
  bool       hasInterp = true;
H
Haojun Liao 已提交
264 265 266
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

G
Ganlin Zhao 已提交
267
    int32_t       dstSlot = pExprInfo->base.resSchema.slotId;
H
Haojun Liao 已提交
268 269
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);

270
    if (isIrowtsPseudoColumn(pExprInfo)) {
271
      colDataSetVal(pDst, rows, (char*)&pSliceInfo->current, false);
H
Haojun Liao 已提交
272
      continue;
273
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
274
      bool isFilled = true;
H
Haojun Liao 已提交
275
      colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
276
      continue;
G
Ganlin Zhao 已提交
277
    } else if (!isInterpFunc(pExprInfo)) {
G
Ganlin Zhao 已提交
278
      if (isGroupKeyFunc(pExprInfo)) {
G
Ganlin Zhao 已提交
279 280 281
        if (pSrcBlock != NULL) {
          int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
          SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
G
Ganlin Zhao 已提交
282

G
Ganlin Zhao 已提交
283 284 285 286
          if (colDataIsNull_s(pSrc, index)) {
            colDataSetNULL(pDst, pResBlock->info.rows);
            continue;
          }
G
Ganlin Zhao 已提交
287

G
Ganlin Zhao 已提交
288 289 290 291 292 293 294 295 296 297 298
          char* v = colDataGetData(pSrc, index);
          colDataSetVal(pDst, pResBlock->info.rows, v, false);
        } else {
          // use stored group key
          SGroupKeys* pkey = pSliceInfo->pPrevGroupKey;
          if (pkey->isNull == false) {
            colDataSetVal(pDst, rows, pkey->pData, false);
          } else {
            colDataSetNULL(pDst, rows);
          }
        }
G
Ganlin Zhao 已提交
299
      }
G
Ganlin Zhao 已提交
300
      continue;
H
Haojun Liao 已提交
301 302 303 304
    }

    int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
    switch (pSliceInfo->fillType) {
D
dapan1121 已提交
305 306
      case TSDB_FILL_NULL:
      case TSDB_FILL_NULL_F: {
307
        colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
308 309 310
        break;
      }

D
dapan1121 已提交
311 312
      case TSDB_FILL_SET_VALUE:
      case TSDB_FILL_SET_VALUE_F: {
G
Ganlin Zhao 已提交
313
        SVariant* pVar = &pSliceInfo->pFillColInfo[fillColIndex].fillVal;
H
Haojun Liao 已提交
314

315
        bool isNull = (TSDB_DATA_TYPE_NULL == pVar->nType) ? true : false;
H
Haojun Liao 已提交
316 317
        if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
          float v = 0;
G
Ganlin Zhao 已提交
318
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
319
            GET_TYPED_DATA(v, float, pVar->nType, &pVar->f);
G
Ganlin Zhao 已提交
320 321 322
          } else {
            v = taosStr2Float(varDataVal(pVar->pz), NULL);
          }
323
          colDataSetVal(pDst, rows, (char*)&v, isNull);
H
Haojun Liao 已提交
324 325
        } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
          double v = 0;
G
Ganlin Zhao 已提交
326
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
327
            GET_TYPED_DATA(v, double, pVar->nType, &pVar->d);
G
Ganlin Zhao 已提交
328 329 330
          } else {
            v = taosStr2Double(varDataVal(pVar->pz), NULL);
          }
331
          colDataSetVal(pDst, rows, (char*)&v, isNull);
H
Haojun Liao 已提交
332 333
        } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
          int64_t v = 0;
G
Ganlin Zhao 已提交
334 335 336
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
            GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
          } else {
337 338
            v = taosStr2Int64(varDataVal(pVar->pz), NULL, 10);
          }
339
          colDataSetVal(pDst, rows, (char*)&v, isNull);
340 341 342 343 344 345
        } else if (IS_UNSIGNED_NUMERIC_TYPE(pDst->info.type)) {
          uint64_t v = 0;
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
            GET_TYPED_DATA(v, uint64_t, pVar->nType, &pVar->u);
          } else {
            v = taosStr2UInt64(varDataVal(pVar->pz), NULL, 10);
G
Ganlin Zhao 已提交
346
          }
347
          colDataSetVal(pDst, rows, (char*)&v, isNull);
348 349 350 351 352 353 354
        } else if (IS_BOOLEAN_TYPE(pDst->info.type)) {
          bool v = false;
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
            GET_TYPED_DATA(v, bool, pVar->nType, &pVar->i);
          } else {
            v = taosStr2Int8(varDataVal(pVar->pz), NULL, 10);
          }
355
          colDataSetVal(pDst, rows, (char*)&v, isNull);
H
Haojun Liao 已提交
356
        }
G
Ganlin Zhao 已提交
357 358

        ++fillColIndex;
H
Haojun Liao 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
        break;
      }

      case TSDB_FILL_LINEAR: {
        SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot);

        SPoint start = pLinearInfo->start;
        SPoint end = pLinearInfo->end;
        SPoint current = {.key = pSliceInfo->current};

        // do not interpolate before ts range, only increate pSliceInfo->current
        if (beforeTs && !pLinearInfo->isEndSet) {
          return true;
        }

        if (!pLinearInfo->isStartSet || !pLinearInfo->isEndSet) {
          hasInterp = false;
          break;
        }

379 380 381 382 383
        if (end.key != INT64_MIN && end.key < pSliceInfo->current) {
          hasInterp = false;
          break;
        }

H
Haojun Liao 已提交
384
        if (start.key == INT64_MIN || end.key == INT64_MIN) {
385
          colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
386 387 388 389 390
          break;
        }

        current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
        taosGetLinearInterpolationVal(&current, pLinearInfo->type, &start, &end, pLinearInfo->type);
391
        colDataSetVal(pDst, rows, (char*)current.val, false);
H
Haojun Liao 已提交
392 393 394 395 396 397 398 399 400 401 402 403

        taosMemoryFree(current.val);
        break;
      }
      case TSDB_FILL_PREV: {
        if (!pSliceInfo->isPrevRowSet) {
          hasInterp = false;
          break;
        }

        SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
        if (pkey->isNull == false) {
404
          colDataSetVal(pDst, rows, pkey->pData, false);
H
Haojun Liao 已提交
405
        } else {
406
          colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
407 408 409 410 411 412 413 414 415 416 417 418
        }
        break;
      }

      case TSDB_FILL_NEXT: {
        if (!pSliceInfo->isNextRowSet) {
          hasInterp = false;
          break;
        }

        SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
        if (pkey->isNull == false) {
419
          colDataSetVal(pDst, rows, pkey->pData, false);
H
Haojun Liao 已提交
420
        } else {
421
          colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
        }
        break;
      }

      case TSDB_FILL_NONE:
      default:
        break;
    }
  }

  if (hasInterp) {
    pResBlock->info.rows += 1;
  }

  return hasInterp;
}

static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
                                  SSDataBlock* pSrcBlock, int32_t index) {
D
dapan1121 已提交
441
  timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
H
Haojun Liao 已提交
442 443 444 445 446 447
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

    int32_t          dstSlot = pExprInfo->base.resSchema.slotId;
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);

448
    if (isIrowtsPseudoColumn(pExprInfo)) {
449
      colDataSetVal(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
450
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
451
      bool isFilled = false;
X
Xiaoyu Wang 已提交
452
      colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
H
Haojun Liao 已提交
453
    } else {
G
Ganlin Zhao 已提交
454
      int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
H
Haojun Liao 已提交
455 456 457
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);

      if (colDataIsNull_s(pSrc, index)) {
458
        colDataSetNULL(pDst, pResBlock->info.rows);
H
Haojun Liao 已提交
459 460 461 462
        continue;
      }

      char* v = colDataGetData(pSrc, index);
463
      colDataSetVal(pDst, pResBlock->info.rows, v, false);
H
Haojun Liao 已提交
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
    }
  }

  pResBlock->info.rows += 1;
  return;
}

static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
  if (pInfo->pPrevRow != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
  if (pInfo->pPrevRow == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys key = {0};
    key.bytes = pColInfo->info.bytes;
    key.type = pColInfo->info.type;
    key.isNull = false;
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
    taosArrayPush(pInfo->pPrevRow, &key);
  }

  pInfo->isPrevRowSet = false;

  return TSDB_CODE_SUCCESS;
}

static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
  if (pInfo->pNextRow != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys));
  if (pInfo->pNextRow == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys key = {0};
    key.bytes = pColInfo->info.bytes;
    key.type = pColInfo->info.type;
    key.isNull = false;
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
    taosArrayPush(pInfo->pNextRow, &key);
  }

  pInfo->isNextRowSet = false;

  return TSDB_CODE_SUCCESS;
}

static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
  if (pInfo->pLinearInfo != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo));
  if (pInfo->pLinearInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SFillLinearInfo linearInfo = {0};
    linearInfo.start.key = INT64_MIN;
    linearInfo.end.key = INT64_MIN;
    linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
    linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
    linearInfo.isStartSet = false;
    linearInfo.isEndSet = false;
    linearInfo.type = pColInfo->info.type;
    linearInfo.bytes = pColInfo->info.bytes;
    taosArrayPush(pInfo->pLinearInfo, &linearInfo);
  }

  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) {
  if (pInfo->pPrevGroupKey != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys));
  if (pInfo->pPrevGroupKey == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];

    if (isGroupKeyFunc(pExprInfo)) {
      pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes;
      pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type;
      pInfo->pPrevGroupKey->isNull = false;
      pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) {
H
Haojun Liao 已提交
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
  int32_t code;
  code = initPrevRowsKeeper(pInfo, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

  code = initNextRowsKeeper(pInfo, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

  code = initFillLinearInfo(pInfo, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

G
Ganlin Zhao 已提交
595 596 597 598 599 600
  code = initGroupKeyKeeper(pInfo, pExprSup);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }


H
Haojun Liao 已提交
601 602 603
  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
604 605 606
static int32_t resetPrevRowsKeeper(STimeSliceOperatorInfo* pInfo) {
  if (pInfo->pPrevRow == NULL) {
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
607 608
  }

G
Ganlin Zhao 已提交
609 610 611 612
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
    pKey->isNull = false;
  }
H
Haojun Liao 已提交
613

G
Ganlin Zhao 已提交
614
  pInfo->isPrevRowSet = false;
H
Haojun Liao 已提交
615

G
Ganlin Zhao 已提交
616 617
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
618

G
Ganlin Zhao 已提交
619 620 621 622
static int32_t resetNextRowsKeeper(STimeSliceOperatorInfo* pInfo) {
  if (pInfo->pNextRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
623

G
Ganlin Zhao 已提交
624 625 626 627 628 629
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
    pKey->isNull = false;
  }

  pInfo->isNextRowSet = false;
H
Haojun Liao 已提交
630

G
Ganlin Zhao 已提交
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
  return TSDB_CODE_SUCCESS;
}

static int32_t resetFillLinearInfo(STimeSliceOperatorInfo* pInfo) {
  if (pInfo->pLinearInfo == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SFillLinearInfo *pLinearInfo = taosArrayGet(pInfo->pLinearInfo, i);
    pLinearInfo->start.key = INT64_MIN;
    pLinearInfo->end.key = INT64_MIN;
    pLinearInfo->isStartSet = false;
    pLinearInfo->isEndSet = false;
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
  resetPrevRowsKeeper(pInfo);
  resetNextRowsKeeper(pInfo);
  resetFillLinearInfo(pInfo);

  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
658
static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, int32_t threshold) {
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
  SSDataBlock* pResBlock = pSliceInfo->pRes;
  if (pResBlock->info.rows > threshold) {
    return true;
  }

  return false;
}

static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) {
  if (pSliceInfo->current > pSliceInfo->win.ekey) {
    return true;
  }

  return false;
}

G
Ganlin Zhao 已提交
675 676 677 678 679 680
static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t curIndex) {
  SSDataBlock* pResBlock = pSliceInfo->pRes;

  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
  if (curIndex < pBlock->info.rows - 1) {
    pSliceInfo->pRemainRes = pBlock;
G
Ganlin Zhao 已提交
681 682
    pSliceInfo->remainIndex = curIndex + 1;
    return;
G
Ganlin Zhao 已提交
683
  }
G
Ganlin Zhao 已提交
684 685 686 687

  // all data in remaining block processed
  pSliceInfo->pRemainRes = NULL;

G
Ganlin Zhao 已提交
688 689
}

G
Ganlin Zhao 已提交
690
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
691
                            SExecTaskInfo* pTaskInfo, bool ignoreNull) {
G
Ganlin Zhao 已提交
692 693 694 695 696
  SSDataBlock* pResBlock = pSliceInfo->pRes;
  SInterval*   pInterval = &pSliceInfo->interval;

  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);

G
Ganlin Zhao 已提交
697 698 699
  int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex;
  for (; i < pBlock->info.rows; ++i) {
    int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
G
Ganlin Zhao 已提交
700

G
Ganlin Zhao 已提交
701 702 703
    // check for duplicate timestamps
    if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
704 705
    }

706 707 708
    if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
      continue;
    }
H
Haojun Liao 已提交
709

G
Ganlin Zhao 已提交
710 711
    if (ts == pSliceInfo->current) {
      addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
H
Haojun Liao 已提交
712

G
Ganlin Zhao 已提交
713 714
      doKeepPrevRows(pSliceInfo, pBlock, i);
      doKeepLinearInfo(pSliceInfo, pBlock, i);
H
Haojun Liao 已提交
715

G
Ganlin Zhao 已提交
716 717
      pSliceInfo->current =
          taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
718 719

      if (checkWindowBoundReached(pSliceInfo)) {
H
Haojun Liao 已提交
720 721
        break;
      }
G
Ganlin Zhao 已提交
722 723
      if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
        saveBlockStatus(pSliceInfo, pBlock, i);
724 725
        return;
      }
G
Ganlin Zhao 已提交
726 727 728 729
    } else if (ts < pSliceInfo->current) {
      // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
      doKeepPrevRows(pSliceInfo, pBlock, i);
      doKeepLinearInfo(pSliceInfo, pBlock, i);
H
Haojun Liao 已提交
730

G
Ganlin Zhao 已提交
731 732 733 734 735 736
      if (i < pBlock->info.rows - 1) {
        // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
        doKeepNextRows(pSliceInfo, pBlock, i + 1);
        int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
        if (nextTs > pSliceInfo->current) {
          while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
G
Ganlin Zhao 已提交
737
            if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false) &&
G
Ganlin Zhao 已提交
738
                pSliceInfo->fillType == TSDB_FILL_LINEAR) {
H
Haojun Liao 已提交
739
              break;
G
Ganlin Zhao 已提交
740 741 742
            } else {
              pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
                                                pInterval->precision);
H
Haojun Liao 已提交
743 744 745
            }
          }

746
          if (checkWindowBoundReached(pSliceInfo)) {
H
Haojun Liao 已提交
747 748
            break;
          }
G
Ganlin Zhao 已提交
749 750
          if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
            saveBlockStatus(pSliceInfo, pBlock, i);
751 752
            return;
          }
G
Ganlin Zhao 已提交
753 754
        } else {
          // ignore current row, and do nothing
H
Haojun Liao 已提交
755
        }
G
Ganlin Zhao 已提交
756 757 758 759 760 761 762 763 764
      } else {  // it is the last row of current block
        doKeepPrevRows(pSliceInfo, pBlock, i);
      }
    } else {  // ts > pSliceInfo->current
      // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
      doKeepNextRows(pSliceInfo, pBlock, i);
      doKeepLinearInfo(pSliceInfo, pBlock, i);

      while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
G
Ganlin Zhao 已提交
765
        if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true) &&
G
Ganlin Zhao 已提交
766 767 768
            pSliceInfo->fillType == TSDB_FILL_LINEAR) {
          break;
        } else {
H
Haojun Liao 已提交
769 770 771
          pSliceInfo->current =
              taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
        }
G
Ganlin Zhao 已提交
772
      }
H
Haojun Liao 已提交
773

G
Ganlin Zhao 已提交
774 775 776 777 778 779 780 781 782
      // add current row if timestamp match
      if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
        addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);

        pSliceInfo->current =
            taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
      }
      doKeepPrevRows(pSliceInfo, pBlock, i);

783
      if (checkWindowBoundReached(pSliceInfo)) {
G
Ganlin Zhao 已提交
784
        break;
H
Haojun Liao 已提交
785
      }
G
Ganlin Zhao 已提交
786 787
      if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
        saveBlockStatus(pSliceInfo, pBlock, i);
788 789
        return;
      }
H
Haojun Liao 已提交
790 791
    }
  }
792 793 794 795 796

  // if reached here, meaning block processing finished naturally,
  // or interpolation reach window upper bound
  pSliceInfo->pRemainRes = NULL;

G
Ganlin Zhao 已提交
797 798
}

G
Ganlin Zhao 已提交
799
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
G
Ganlin Zhao 已提交
800 801
  SSDataBlock* pResBlock = pSliceInfo->pRes;
  SInterval*   pInterval = &pSliceInfo->interval;
H
Haojun Liao 已提交
802 803 804

  while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
         pSliceInfo->fillType != TSDB_FILL_LINEAR) {
G
Ganlin Zhao 已提交
805
    genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false);
H
Haojun Liao 已提交
806 807 808
    pSliceInfo->current =
        taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
  }
G
Ganlin Zhao 已提交
809 810
}

G
Ganlin Zhao 已提交
811 812 813 814 815 816 817 818 819 820 821 822
static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) {
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

    if (isGroupKeyFunc(pExprInfo)) {
      int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);

      if (colDataIsNull_s(pSrc, 0)) {
        pGroupKey->isNull = true;
        break;
      }
H
Haojun Liao 已提交
823

G
Ganlin Zhao 已提交
824 825 826 827 828 829 830 831 832 833 834
      char* v = colDataGetData(pSrc, 0);
      if (IS_VAR_DATA_TYPE(pGroupKey->type)) {
        memcpy(pGroupKey->pData, v, varDataTLen(v));
      } else {
        memcpy(pGroupKey->pData, v, pGroupKey->bytes);
      }

      pGroupKey->isNull = false;
      break;
    }
  }
G
Ganlin Zhao 已提交
835 836
}

G
Ganlin Zhao 已提交
837
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
G
Ganlin Zhao 已提交
838 839
  pSliceInfo->current = pSliceInfo->win.skey;
  pSliceInfo->prevTsSet = false;
G
Ganlin Zhao 已提交
840
  resetKeeperInfo(pSliceInfo);
G
Ganlin Zhao 已提交
841 842
}

843 844 845 846 847 848 849 850
static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
  SExprSupp*              pSup = &pOperator->exprSupp;
  bool                    ignoreNull = getIgoreNullRes(pSup);
  int32_t                 order = TSDB_ORDER_ASC;

851 852 853 854
  if (checkWindowBoundReached(pSliceInfo)) {
    return;
  }

855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870
  int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
  if (code != TSDB_CODE_SUCCESS) {
    T_LONG_JMP(pTaskInfo->env, code);
  }

  if (pSliceInfo->scalarSup.pExprInfo != NULL) {
    SExprSupp* pExprSup = &pSliceInfo->scalarSup;
    projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
  }

  // the pDataBlock are always the same one, no need to call this again
  setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
  doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
  copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
}

H
Haojun Liao 已提交
871 872 873 874 875
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

876
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
877 878 879 880 881 882 883
  STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
  SSDataBlock*            pResBlock = pSliceInfo->pRes;

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  blockDataCleanup(pResBlock);

  while (1) {
G
Ganlin Zhao 已提交
884
    if (pSliceInfo->pNextGroupRes != NULL) {
885
      doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes);
G
Ganlin Zhao 已提交
886
      if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
887
        doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
G
Ganlin Zhao 已提交
888 889 890
        if (pSliceInfo->pRemainRes == NULL) {
          pSliceInfo->pNextGroupRes = NULL;
        }
G
Ganlin Zhao 已提交
891 892 893 894 895 896 897
        if (pResBlock->info.rows != 0) {
          goto _finished;
        } else {
          // after fillter if result block has 0 rows, go back to
          // process pNextGroupRes again for unfinished data
          continue;
        }
898
      }
G
Ganlin Zhao 已提交
899
      pSliceInfo->pNextGroupRes = NULL;
H
Haojun Liao 已提交
900 901
    }

G
Ganlin Zhao 已提交
902
    while (1) {
903
      SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : downstream->fpSet.getNextFn(downstream);
G
Ganlin Zhao 已提交
904
      if (pBlock == NULL) {
G
Ganlin Zhao 已提交
905
        setOperatorCompleted(pOperator);
G
Ganlin Zhao 已提交
906 907
        break;
      }
908

W
wangjiaming0909 已提交
909
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
G
Ganlin Zhao 已提交
910 911 912 913 914 915
      if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
        pSliceInfo->groupId = pBlock->info.id.groupId;
      } else {
        if (pSliceInfo->groupId != pBlock->info.id.groupId) {
          pSliceInfo->groupId = pBlock->info.id.groupId;
          pSliceInfo->pNextGroupRes = pBlock;
G
Ganlin Zhao 已提交
916
          break;
G
Ganlin Zhao 已提交
917 918 919
        }
      }

920
      doHandleTimeslice(pOperator, pBlock);
G
Ganlin Zhao 已提交
921
      if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
922
        doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
G
Ganlin Zhao 已提交
923 924 925
        if (pResBlock->info.rows != 0) {
          goto _finished;
        }
G
Ganlin Zhao 已提交
926
      }
H
Haojun Liao 已提交
927
    }
G
Ganlin Zhao 已提交
928
    // post work for a specific group
H
Haojun Liao 已提交
929

G
Ganlin Zhao 已提交
930 931
    // check if need to interpolate after last datablock
    // except for fill(next), fill(linear)
G
Ganlin Zhao 已提交
932
    genInterpAfterDataBlock(pSliceInfo, pOperator, 0);
H
Haojun Liao 已提交
933

G
Ganlin Zhao 已提交
934 935 936 937
    doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
    if (pOperator->status == OP_EXEC_DONE) {
      break;
    }
G
Ganlin Zhao 已提交
938 939

    // restore initial value for next group
G
Ganlin Zhao 已提交
940
    resetTimesliceInfo(pSliceInfo);
G
Ganlin Zhao 已提交
941 942
    if (pResBlock->info.rows != 0) {
      break;
G
Ganlin Zhao 已提交
943
    }
G
Ganlin Zhao 已提交
944
  }
945

946
_finished:
H
Haojun Liao 已提交
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967
  // restore the value
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
  if (pResBlock->info.rows == 0) {
    pOperator->status = OP_EXEC_DONE;
  }

  return pResBlock->info.rows == 0 ? NULL : pResBlock;
}

SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
  STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pOperator == NULL || pInfo == NULL) {
    goto _error;
  }

  SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode;
  SExprSupp*            pSup = &pOperator->exprSupp;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
968
  int32_t    code = initExprSupp(pSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
H
Haojun Liao 已提交
969 970 971 972 973 974 975
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (pInterpPhyNode->pExprs != NULL) {
    int32_t    num = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
976
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
H
Haojun Liao 已提交
977 978 979 980 981
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

982 983 984 985 986
  code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
987 988 989 990 991 992 993 994 995 996
  pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
  pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
  initResultSizeInfo(&pOperator->resultInfo, 4096);

  pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
  pInfo->pLinearInfo = NULL;
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
  pInfo->win = pInterpPhyNode->timeRange;
  pInfo->interval.interval = pInterpPhyNode->interval;
  pInfo->current = pInfo->win.skey;
G
Ganlin Zhao 已提交
997 998
  pInfo->prevTsSet = false;
  pInfo->prevTs = 0;
G
Ganlin Zhao 已提交
999
  pInfo->groupId = 0;
G
Ganlin Zhao 已提交
1000
  pInfo->pPrevGroupKey = NULL;
G
Ganlin Zhao 已提交
1001
  pInfo->pNextGroupRes = NULL;
1002
  pInfo->pRemainRes = NULL;
G
Ganlin Zhao 已提交
1003
  pInfo->remainIndex = 0;
H
Haojun Liao 已提交
1004 1005 1006 1007 1008 1009 1010 1011 1012

  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
    pScanInfo->base.cond.twindows = pInfo->win;
    pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL;
  }

  setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
X
Xiaoyu Wang 已提交
1013 1014
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1015 1016 1017 1018 1019 1020

  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

X
Xiaoyu Wang 已提交
1021
_error:
H
Haojun Liao 已提交
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

void destroyTimeSliceOperatorInfo(void* param) {
  STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;

  pInfo->pRes = blockDataDestroy(pInfo->pRes);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
    taosMemoryFree(pKey->pData);
  }
  taosArrayDestroy(pInfo->pPrevRow);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) {
    SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i);
    taosMemoryFree(pKey->pData);
  }
  taosArrayDestroy(pInfo->pNextRow);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i);
    taosMemoryFree(pKey->start.val);
    taosMemoryFree(pKey->end.val);
  }
  taosArrayDestroy(pInfo->pLinearInfo);
G
Ganlin Zhao 已提交
1051

1052 1053 1054 1055
  if (pInfo->pPrevGroupKey) {
    taosMemoryFree(pInfo->pPrevGroupKey->pData);
    taosMemoryFree(pInfo->pPrevGroupKey);
  }
G
Ganlin Zhao 已提交
1056

D
dapan1121 已提交
1057
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
1058

G
Ganlin Zhao 已提交
1059 1060 1061
  for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
    taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal);
  }
H
Haojun Liao 已提交
1062 1063 1064
  taosMemoryFree(pInfo->pFillColInfo);
  taosMemoryFreeClear(param);
}