timesliceoperator.c 30.3 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15
#include "executorInt.h"
H
Haojun Liao 已提交
16 17 18
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
19 20
#include "operator.h"
#include "querytask.h"
21
#include "storageapi.h"
H
Haojun Liao 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"

typedef struct STimeSliceOperatorInfo {
  SSDataBlock*         pRes;
  STimeWindow          win;
  SInterval            interval;
  int64_t              current;
  SArray*              pPrevRow;     // SArray<SGroupValue>
  SArray*              pNextRow;     // SArray<SGroupValue>
  SArray*              pLinearInfo;  // SArray<SFillLinearInfo>
  bool                 isPrevRowSet;
  bool                 isNextRowSet;
  int32_t              fillType;      // fill type
  SColumn              tsCol;         // primary timestamp column
  SExprSupp            scalarSup;     // scalar calculation
  struct SFillColInfo* pFillColInfo;  // fill column info
G
Ganlin Zhao 已提交
42 43
  int64_t              prevTs;
  bool                 prevTsSet;
G
Ganlin Zhao 已提交
44
  uint64_t             groupId;
G
Ganlin Zhao 已提交
45
  SGroupKeys*          pPrevGroupKey;
G
Ganlin Zhao 已提交
46
  SSDataBlock*         pNextGroupRes;
H
Haojun Liao 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59
} STimeSliceOperatorInfo;

static void destroyTimeSliceOperatorInfo(void* param);

static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
      pkey->isNull = false;
      char* val = colDataGetData(pColInfoData, rowIndex);
H
Haojun Liao 已提交
60
      if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
61
        memcpy(pkey->pData, val, varDataLen(val));
H
Haojun Liao 已提交
62 63
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
H
Haojun Liao 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
      }
    } else {
      pkey->isNull = true;
    }
  }

  pSliceInfo->isPrevRowSet = true;
}

static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i);
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
      pkey->isNull = false;
      char* val = colDataGetData(pColInfoData, rowIndex);
      if (!IS_VAR_DATA_TYPE(pkey->type)) {
        memcpy(pkey->pData, val, pkey->bytes);
      } else {
        memcpy(pkey->pData, val, varDataLen(val));
      }
    } else {
      pkey->isNull = true;
    }
  }

  pSliceInfo->isNextRowSet = true;
}

static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
    SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i);

102 103 104 105
    if (!IS_MATHABLE_TYPE(pColInfoData->info.type)) {
      continue;
    }

H
Haojun Liao 已提交
106 107 108 109 110
    // null value is represented by using key = INT64_MIN for now.
    // TODO: optimize to ignore null values for linear interpolation.
    if (!pLinearInfo->isStartSet) {
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
        pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
111 112 113 114 115 116 117
        char* p = colDataGetData(pColInfoData, rowIndex);
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
          memcpy(pLinearInfo->start.val, p, varDataTLen(p));
        } else {
          memcpy(pLinearInfo->start.val, p, pLinearInfo->bytes);
        }
H
Haojun Liao 已提交
118 119 120 121 122
      }
      pLinearInfo->isStartSet = true;
    } else if (!pLinearInfo->isEndSet) {
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
123 124 125 126

        char* p = colDataGetData(pColInfoData, rowIndex);
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
127
          memcpy(pLinearInfo->end.val, p, varDataTLen(p));
128
        } else {
H
Haojun Liao 已提交
129
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
130
        }
H
Haojun Liao 已提交
131 132 133 134 135 136 137 138
      }
      pLinearInfo->isEndSet = true;
    } else {
      pLinearInfo->start.key = pLinearInfo->end.key;
      memcpy(pLinearInfo->start.val, pLinearInfo->end.val, pLinearInfo->bytes);

      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
139 140 141 142

        char* p = colDataGetData(pColInfoData, rowIndex);
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
H
Haojun Liao 已提交
143
          memcpy(pLinearInfo->end.val, p, varDataTLen(p));
144
        } else {
H
Haojun Liao 已提交
145
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
146 147
        }

H
Haojun Liao 已提交
148 149 150 151 152 153 154
      } else {
        pLinearInfo->end.key = INT64_MIN;
      }
    }
  }
}

D
dapan1121 已提交
155 156 157 158 159 160 161 162 163 164 165 166
static FORCE_INLINE int32_t timeSliceEnsureBlockCapacity(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock) {
  if (pBlock->info.rows < pBlock->info.capacity) {
    return TSDB_CODE_SUCCESS;
  }

  uint32_t winNum = (pSliceInfo->win.ekey - pSliceInfo->win.skey) / pSliceInfo->interval.interval;
  uint32_t newRowsNum = pBlock->info.rows + TMIN(winNum / 4 + 1, 1048576);
  blockDataEnsureCapacity(pBlock, newRowsNum);

  return TSDB_CODE_SUCCESS;
}

167 168 169 170 171 172 173 174 175 176
static bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) {
  char *name = pExprInfo->pExpr->_function.functionName;
  return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts") == 0);
}

static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
  char *name = pExprInfo->pExpr->_function.functionName;
  return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
}

G
Ganlin Zhao 已提交
177 178 179 180 181
static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol,
                                     int32_t curIndex, int32_t rows) {


  int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex);
G
Ganlin Zhao 已提交
182 183 184 185
  if (currentTs > pSliceInfo->win.ekey) {
    return false;
  }

G
Ganlin Zhao 已提交
186 187 188 189 190 191 192
  if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevTs)) {
    return true;
  }

  pSliceInfo->prevTsSet = true;
  pSliceInfo->prevTs = currentTs;

G
Ganlin Zhao 已提交
193
  if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) {
G
Ganlin Zhao 已提交
194 195 196 197 198 199 200 201 202
    int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
    if (currentTs == nextTs) {
      return true;
    }
  }

  return false;
}

G
Ganlin Zhao 已提交
203
static bool isInterpFunc(SExprInfo* pExprInfo) {
204 205
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
  return (functionType == FUNCTION_TYPE_INTERP);
G
Ganlin Zhao 已提交
206 207
}

G
Ganlin Zhao 已提交
208
static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
209 210
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
  return (functionType == FUNCTION_TYPE_GROUP_KEY);
G
Ganlin Zhao 已提交
211 212
}

X
Xiaoyu Wang 已提交
213
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
G
Ganlin Zhao 已提交
214
                                   SSDataBlock* pSrcBlock, int32_t index, bool beforeTs) {
H
Haojun Liao 已提交
215
  int32_t rows = pResBlock->info.rows;
D
dapan1121 已提交
216
  timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
H
Haojun Liao 已提交
217 218
  // todo set the correct primary timestamp column

G
Ganlin Zhao 已提交
219

H
Haojun Liao 已提交
220 221 222 223 224
  // output the result
  bool hasInterp = true;
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

G
Ganlin Zhao 已提交
225
    int32_t       dstSlot = pExprInfo->base.resSchema.slotId;
H
Haojun Liao 已提交
226 227
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);

228
    if (isIrowtsPseudoColumn(pExprInfo)) {
229
      colDataSetVal(pDst, rows, (char*)&pSliceInfo->current, false);
H
Haojun Liao 已提交
230
      continue;
231
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
232 233 234
      bool isFilled = true;
      colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false);
      continue;
G
Ganlin Zhao 已提交
235
    } else if (!isInterpFunc(pExprInfo)) {
G
Ganlin Zhao 已提交
236
      if (isGroupKeyFunc(pExprInfo)) {
G
Ganlin Zhao 已提交
237 238 239
        if (pSrcBlock != NULL) {
          int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
          SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
G
Ganlin Zhao 已提交
240

G
Ganlin Zhao 已提交
241 242 243 244
          if (colDataIsNull_s(pSrc, index)) {
            colDataSetNULL(pDst, pResBlock->info.rows);
            continue;
          }
G
Ganlin Zhao 已提交
245

G
Ganlin Zhao 已提交
246 247 248 249 250 251 252 253 254 255 256
          char* v = colDataGetData(pSrc, index);
          colDataSetVal(pDst, pResBlock->info.rows, v, false);
        } else {
          // use stored group key
          SGroupKeys* pkey = pSliceInfo->pPrevGroupKey;
          if (pkey->isNull == false) {
            colDataSetVal(pDst, rows, pkey->pData, false);
          } else {
            colDataSetNULL(pDst, rows);
          }
        }
G
Ganlin Zhao 已提交
257
      }
G
Ganlin Zhao 已提交
258
      continue;
H
Haojun Liao 已提交
259 260 261 262
    }

    int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
    switch (pSliceInfo->fillType) {
D
dapan1121 已提交
263 264
      case TSDB_FILL_NULL:
      case TSDB_FILL_NULL_F: {
265
        colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
266 267 268
        break;
      }

D
dapan1121 已提交
269 270
      case TSDB_FILL_SET_VALUE:
      case TSDB_FILL_SET_VALUE_F: {
H
Haojun Liao 已提交
271 272 273 274 275
        SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal;

        if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
          float v = 0;
          GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
276
          colDataSetVal(pDst, rows, (char*)&v, false);
H
Haojun Liao 已提交
277 278 279
        } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
          double v = 0;
          GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
280
          colDataSetVal(pDst, rows, (char*)&v, false);
H
Haojun Liao 已提交
281 282 283
        } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
          int64_t v = 0;
          GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
284
          colDataSetVal(pDst, rows, (char*)&v, false);
285 286 287 288 289 290 291 292
        } else if (IS_BOOLEAN_TYPE(pDst->info.type)) {
          bool v = false;
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
            GET_TYPED_DATA(v, bool, pVar->nType, &pVar->i);
          } else {
            v = taosStr2Int8(varDataVal(pVar->pz), NULL, 10);
          }
          colDataSetVal(pDst, rows, (char*)&v, false);
H
Haojun Liao 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
        }
        break;
      }

      case TSDB_FILL_LINEAR: {
        SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot);

        SPoint start = pLinearInfo->start;
        SPoint end = pLinearInfo->end;
        SPoint current = {.key = pSliceInfo->current};

        // do not interpolate before ts range, only increate pSliceInfo->current
        if (beforeTs && !pLinearInfo->isEndSet) {
          return true;
        }

        if (!pLinearInfo->isStartSet || !pLinearInfo->isEndSet) {
          hasInterp = false;
          break;
        }

314 315 316 317 318
        if (end.key != INT64_MIN && end.key < pSliceInfo->current) {
          hasInterp = false;
          break;
        }

H
Haojun Liao 已提交
319
        if (start.key == INT64_MIN || end.key == INT64_MIN) {
320
          colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
321 322 323 324 325
          break;
        }

        current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
        taosGetLinearInterpolationVal(&current, pLinearInfo->type, &start, &end, pLinearInfo->type);
326
        colDataSetVal(pDst, rows, (char*)current.val, false);
H
Haojun Liao 已提交
327 328 329 330 331 332 333 334 335 336 337 338

        taosMemoryFree(current.val);
        break;
      }
      case TSDB_FILL_PREV: {
        if (!pSliceInfo->isPrevRowSet) {
          hasInterp = false;
          break;
        }

        SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
        if (pkey->isNull == false) {
339
          colDataSetVal(pDst, rows, pkey->pData, false);
H
Haojun Liao 已提交
340
        } else {
341
          colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
342 343 344 345 346 347 348 349 350 351 352 353
        }
        break;
      }

      case TSDB_FILL_NEXT: {
        if (!pSliceInfo->isNextRowSet) {
          hasInterp = false;
          break;
        }

        SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot);
        if (pkey->isNull == false) {
354
          colDataSetVal(pDst, rows, pkey->pData, false);
H
Haojun Liao 已提交
355
        } else {
356
          colDataSetNULL(pDst, rows);
H
Haojun Liao 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
        }
        break;
      }

      case TSDB_FILL_NONE:
      default:
        break;
    }
  }

  if (hasInterp) {
    pResBlock->info.rows += 1;
  }

  return hasInterp;
}

static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
                                  SSDataBlock* pSrcBlock, int32_t index) {
D
dapan1121 已提交
376
  timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
H
Haojun Liao 已提交
377 378 379 380 381 382
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

    int32_t          dstSlot = pExprInfo->base.resSchema.slotId;
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);

383
    if (isIrowtsPseudoColumn(pExprInfo)) {
384
      colDataSetVal(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
385
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
386
      bool isFilled = false;
X
Xiaoyu Wang 已提交
387
      colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
H
Haojun Liao 已提交
388
    } else {
G
Ganlin Zhao 已提交
389
      int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
H
Haojun Liao 已提交
390 391 392
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);

      if (colDataIsNull_s(pSrc, index)) {
393
        colDataSetNULL(pDst, pResBlock->info.rows);
H
Haojun Liao 已提交
394 395 396 397
        continue;
      }

      char* v = colDataGetData(pSrc, index);
398
      colDataSetVal(pDst, pResBlock->info.rows, v, false);
H
Haojun Liao 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
    }
  }

  pResBlock->info.rows += 1;
  return;
}

static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
  if (pInfo->pPrevRow != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
  if (pInfo->pPrevRow == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys key = {0};
    key.bytes = pColInfo->info.bytes;
    key.type = pColInfo->info.type;
    key.isNull = false;
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
    taosArrayPush(pInfo->pPrevRow, &key);
  }

  pInfo->isPrevRowSet = false;

  return TSDB_CODE_SUCCESS;
}

static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
  if (pInfo->pNextRow != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys));
  if (pInfo->pNextRow == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SGroupKeys key = {0};
    key.bytes = pColInfo->info.bytes;
    key.type = pColInfo->info.type;
    key.isNull = false;
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
    taosArrayPush(pInfo->pNextRow, &key);
  }

  pInfo->isNextRowSet = false;

  return TSDB_CODE_SUCCESS;
}

static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
  if (pInfo->pLinearInfo != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo));
  if (pInfo->pLinearInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SFillLinearInfo linearInfo = {0};
    linearInfo.start.key = INT64_MIN;
    linearInfo.end.key = INT64_MIN;
    linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
    linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
    linearInfo.isStartSet = false;
    linearInfo.isEndSet = false;
    linearInfo.type = pColInfo->info.type;
    linearInfo.bytes = pColInfo->info.bytes;
    taosArrayPush(pInfo->pLinearInfo, &linearInfo);
  }

  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) {
  if (pInfo->pPrevGroupKey != NULL) {
    return TSDB_CODE_SUCCESS;
  }

  pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys));
  if (pInfo->pPrevGroupKey == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];

    if (isGroupKeyFunc(pExprInfo)) {
      pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes;
      pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type;
      pInfo->pPrevGroupKey->isNull = false;
      pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) {
H
Haojun Liao 已提交
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529
  int32_t code;
  code = initPrevRowsKeeper(pInfo, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

  code = initNextRowsKeeper(pInfo, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

  code = initFillLinearInfo(pInfo, pBlock);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

G
Ganlin Zhao 已提交
530 531 532 533 534 535
  code = initGroupKeyKeeper(pInfo, pExprSup);
  if (code != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }


H
Haojun Liao 已提交
536 537 538
  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
539 540 541
static int32_t resetPrevRowsKeeper(STimeSliceOperatorInfo* pInfo) {
  if (pInfo->pPrevRow == NULL) {
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
542 543
  }

G
Ganlin Zhao 已提交
544 545 546 547
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
    pKey->isNull = false;
  }
H
Haojun Liao 已提交
548

G
Ganlin Zhao 已提交
549
  pInfo->isPrevRowSet = false;
H
Haojun Liao 已提交
550

G
Ganlin Zhao 已提交
551 552
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
553

G
Ganlin Zhao 已提交
554 555 556 557
static int32_t resetNextRowsKeeper(STimeSliceOperatorInfo* pInfo) {
  if (pInfo->pNextRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
558

G
Ganlin Zhao 已提交
559 560 561 562 563 564
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SGroupKeys *pKey = taosArrayGet(pInfo->pPrevRow, i);
    pKey->isNull = false;
  }

  pInfo->isNextRowSet = false;
H
Haojun Liao 已提交
565

G
Ganlin Zhao 已提交
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
  return TSDB_CODE_SUCCESS;
}

static int32_t resetFillLinearInfo(STimeSliceOperatorInfo* pInfo) {
  if (pInfo->pLinearInfo == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SFillLinearInfo *pLinearInfo = taosArrayGet(pInfo->pLinearInfo, i);
    pLinearInfo->start.key = INT64_MIN;
    pLinearInfo->end.key = INT64_MIN;
    pLinearInfo->isStartSet = false;
    pLinearInfo->isEndSet = false;
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
  resetPrevRowsKeeper(pInfo);
  resetNextRowsKeeper(pInfo);
  resetFillLinearInfo(pInfo);

  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
593 594 595 596 597 598 599 600 601 602 603 604
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
                            SExecTaskInfo* pTaskInfo) {
  SSDataBlock* pResBlock = pSliceInfo->pRes;
  SInterval*   pInterval = &pSliceInfo->interval;

  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
    int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);

    // check for duplicate timestamps
    if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) {
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
605 606
    }

G
Ganlin Zhao 已提交
607 608
    if (pSliceInfo->current > pSliceInfo->win.ekey) {
      break;
H
Haojun Liao 已提交
609 610
    }

G
Ganlin Zhao 已提交
611 612
    if (ts == pSliceInfo->current) {
      addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
H
Haojun Liao 已提交
613

G
Ganlin Zhao 已提交
614 615
      doKeepPrevRows(pSliceInfo, pBlock, i);
      doKeepLinearInfo(pSliceInfo, pBlock, i);
H
Haojun Liao 已提交
616

G
Ganlin Zhao 已提交
617 618
      pSliceInfo->current =
          taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
H
Haojun Liao 已提交
619 620 621
      if (pSliceInfo->current > pSliceInfo->win.ekey) {
        break;
      }
G
Ganlin Zhao 已提交
622 623 624 625
    } else if (ts < pSliceInfo->current) {
      // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
      doKeepPrevRows(pSliceInfo, pBlock, i);
      doKeepLinearInfo(pSliceInfo, pBlock, i);
H
Haojun Liao 已提交
626

G
Ganlin Zhao 已提交
627 628 629 630 631 632
      if (i < pBlock->info.rows - 1) {
        // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
        doKeepNextRows(pSliceInfo, pBlock, i + 1);
        int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
        if (nextTs > pSliceInfo->current) {
          while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
G
Ganlin Zhao 已提交
633
            if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false) &&
G
Ganlin Zhao 已提交
634
                pSliceInfo->fillType == TSDB_FILL_LINEAR) {
H
Haojun Liao 已提交
635
              break;
G
Ganlin Zhao 已提交
636 637 638
            } else {
              pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
                                                pInterval->precision);
H
Haojun Liao 已提交
639 640 641
            }
          }

G
Ganlin Zhao 已提交
642
          if (pSliceInfo->current > pSliceInfo->win.ekey) {
H
Haojun Liao 已提交
643 644
            break;
          }
G
Ganlin Zhao 已提交
645 646
        } else {
          // ignore current row, and do nothing
H
Haojun Liao 已提交
647
        }
G
Ganlin Zhao 已提交
648 649 650 651 652 653 654 655 656
      } else {  // it is the last row of current block
        doKeepPrevRows(pSliceInfo, pBlock, i);
      }
    } else {  // ts > pSliceInfo->current
      // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
      doKeepNextRows(pSliceInfo, pBlock, i);
      doKeepLinearInfo(pSliceInfo, pBlock, i);

      while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
G
Ganlin Zhao 已提交
657
        if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true) &&
G
Ganlin Zhao 已提交
658 659 660
            pSliceInfo->fillType == TSDB_FILL_LINEAR) {
          break;
        } else {
H
Haojun Liao 已提交
661 662 663
          pSliceInfo->current =
              taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
        }
G
Ganlin Zhao 已提交
664
      }
H
Haojun Liao 已提交
665

G
Ganlin Zhao 已提交
666 667 668 669 670 671 672 673 674 675 676
      // add current row if timestamp match
      if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
        addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);

        pSliceInfo->current =
            taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
      }
      doKeepPrevRows(pSliceInfo, pBlock, i);

      if (pSliceInfo->current > pSliceInfo->win.ekey) {
        break;
H
Haojun Liao 已提交
677 678 679
      }
    }
  }
G
Ganlin Zhao 已提交
680 681
}

G
Ganlin Zhao 已提交
682
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
G
Ganlin Zhao 已提交
683 684
  SSDataBlock* pResBlock = pSliceInfo->pRes;
  SInterval*   pInterval = &pSliceInfo->interval;
H
Haojun Liao 已提交
685 686 687

  while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
         pSliceInfo->fillType != TSDB_FILL_LINEAR) {
G
Ganlin Zhao 已提交
688
    genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false);
H
Haojun Liao 已提交
689 690 691
    pSliceInfo->current =
        taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
  }
G
Ganlin Zhao 已提交
692 693
}

G
Ganlin Zhao 已提交
694 695 696 697 698 699 700 701 702 703 704 705
static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) {
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];

    if (isGroupKeyFunc(pExprInfo)) {
      int32_t       srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);

      if (colDataIsNull_s(pSrc, 0)) {
        pGroupKey->isNull = true;
        break;
      }
H
Haojun Liao 已提交
706

G
Ganlin Zhao 已提交
707 708 709 710 711 712 713 714 715 716 717
      char* v = colDataGetData(pSrc, 0);
      if (IS_VAR_DATA_TYPE(pGroupKey->type)) {
        memcpy(pGroupKey->pData, v, varDataTLen(v));
      } else {
        memcpy(pGroupKey->pData, v, pGroupKey->bytes);
      }

      pGroupKey->isNull = false;
      break;
    }
  }
G
Ganlin Zhao 已提交
718 719
}

G
Ganlin Zhao 已提交
720
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
G
Ganlin Zhao 已提交
721 722
  pSliceInfo->current = pSliceInfo->win.skey;
  pSliceInfo->prevTsSet = false;
G
Ganlin Zhao 已提交
723
  resetKeeperInfo(pSliceInfo);
G
Ganlin Zhao 已提交
724 725
}

H
Haojun Liao 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
  SSDataBlock*            pResBlock = pSliceInfo->pRes;
  SExprSupp*              pSup = &pOperator->exprSupp;

  int32_t        order = TSDB_ORDER_ASC;
  SInterval*     pInterval = &pSliceInfo->interval;
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  blockDataCleanup(pResBlock);

  while (1) {
G
Ganlin Zhao 已提交
744
    if (pSliceInfo->pNextGroupRes != NULL) {
G
Ganlin Zhao 已提交
745
      setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
G
Ganlin Zhao 已提交
746
      doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
G
Ganlin Zhao 已提交
747
      copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes);
G
Ganlin Zhao 已提交
748
      pSliceInfo->pNextGroupRes = NULL;
H
Haojun Liao 已提交
749 750
    }

G
Ganlin Zhao 已提交
751 752 753
    while (1) {
      SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
      if (pBlock == NULL) {
G
Ganlin Zhao 已提交
754
        setOperatorCompleted(pOperator);
G
Ganlin Zhao 已提交
755 756
        break;
      }
757

G
Ganlin Zhao 已提交
758 759 760 761 762 763
      if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
        pSliceInfo->groupId = pBlock->info.id.groupId;
      } else {
        if (pSliceInfo->groupId != pBlock->info.id.groupId) {
          pSliceInfo->groupId = pBlock->info.id.groupId;
          pSliceInfo->pNextGroupRes = pBlock;
G
Ganlin Zhao 已提交
764
          break;
G
Ganlin Zhao 已提交
765 766 767 768 769 770 771 772
        }
      }

      if (pSliceInfo->scalarSup.pExprInfo != NULL) {
        SExprSupp* pExprSup = &pSliceInfo->scalarSup;
        projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
      }

G
Ganlin Zhao 已提交
773
      int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
G
Ganlin Zhao 已提交
774 775 776 777 778 779 780
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }

      // the pDataBlock are always the same one, no need to call this again
      setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
      doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
G
Ganlin Zhao 已提交
781
      copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
H
Haojun Liao 已提交
782 783
    }

G
Ganlin Zhao 已提交
784 785
    // check if need to interpolate after last datablock
    // except for fill(next), fill(linear)
G
Ganlin Zhao 已提交
786
    genInterpAfterDataBlock(pSliceInfo, pOperator, 0);
H
Haojun Liao 已提交
787

G
Ganlin Zhao 已提交
788 789 790 791
    doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
    if (pOperator->status == OP_EXEC_DONE) {
      break;
    }
G
Ganlin Zhao 已提交
792 793

    // restore initial value for next group
G
Ganlin Zhao 已提交
794
    resetTimesliceInfo(pSliceInfo);
G
Ganlin Zhao 已提交
795 796 797
    if (pResBlock->info.rows >= 4096) {
      break;
    }
G
Ganlin Zhao 已提交
798
  }
799

H
Haojun Liao 已提交
800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834
  // restore the value
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
  if (pResBlock->info.rows == 0) {
    pOperator->status = OP_EXEC_DONE;
  }

  return pResBlock->info.rows == 0 ? NULL : pResBlock;
}

SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
  STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pOperator == NULL || pInfo == NULL) {
    goto _error;
  }

  SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode;
  SExprSupp*            pSup = &pOperator->exprSupp;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
  int32_t    code = initExprSupp(pSup, pExprInfo, numOfExprs);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (pInterpPhyNode->pExprs != NULL) {
    int32_t    num = 0;
    SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

835 836 837 838 839
  code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
840 841 842 843 844 845 846 847 848 849
  pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
  pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
  initResultSizeInfo(&pOperator->resultInfo, 4096);

  pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
  pInfo->pLinearInfo = NULL;
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
  pInfo->win = pInterpPhyNode->timeRange;
  pInfo->interval.interval = pInterpPhyNode->interval;
  pInfo->current = pInfo->win.skey;
G
Ganlin Zhao 已提交
850 851
  pInfo->prevTsSet = false;
  pInfo->prevTs = 0;
G
Ganlin Zhao 已提交
852
  pInfo->groupId = 0;
G
Ganlin Zhao 已提交
853
  pInfo->pPrevGroupKey = NULL;
G
Ganlin Zhao 已提交
854
  pInfo->pNextGroupRes = NULL;
H
Haojun Liao 已提交
855 856 857 858 859 860 861 862 863

  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
    pScanInfo->base.cond.twindows = pInfo->win;
    pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL;
  }

  setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
X
Xiaoyu Wang 已提交
864 865
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
866 867 868 869 870 871

  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

X
Xiaoyu Wang 已提交
872
_error:
H
Haojun Liao 已提交
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

void destroyTimeSliceOperatorInfo(void* param) {
  STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;

  pInfo->pRes = blockDataDestroy(pInfo->pRes);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
    taosMemoryFree(pKey->pData);
  }
  taosArrayDestroy(pInfo->pPrevRow);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) {
    SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i);
    taosMemoryFree(pKey->pData);
  }
  taosArrayDestroy(pInfo->pNextRow);

  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
    SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i);
    taosMemoryFree(pKey->start.val);
    taosMemoryFree(pKey->end.val);
  }
  taosArrayDestroy(pInfo->pLinearInfo);
G
Ganlin Zhao 已提交
902

903 904 905 906
  if (pInfo->pPrevGroupKey) {
    taosMemoryFree(pInfo->pPrevGroupKey->pData);
    taosMemoryFree(pInfo->pPrevGroupKey);
  }
G
Ganlin Zhao 已提交
907

D
dapan1121 已提交
908
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
909

G
Ganlin Zhao 已提交
910 911 912
  for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
    taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal);
  }
H
Haojun Liao 已提交
913 914 915
  taosMemoryFree(pInfo->pFillColInfo);
  taosMemoryFreeClear(param);
}