tfill.c 21.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
#include "taosdef.h"
H
Hongze Cheng 已提交
18
#include "tmsg.h"
19 20
#include "ttypes.h"

S
common  
Shengliang Guan 已提交
21
#include "tcommon.h"
22
#include "thash.h"
23 24
#include "ttime.h"

25 26 27 28 29 30
#include "function.h"
#include "tdatablock.h"
#include "executorInt.h"
#include "querynodes.h"
#include "tfill.h"

31 32 33 34 35 36 37 38 39 40
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))

static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
  for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
    SFillColInfo* pCol = &pFillInfo->pFillCol[j];
    if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || TSDB_COL_IS_UD_COL(pCol->flag)) {
      continue;
    }

41 42
    SResSchema* pSchema = &pCol->pExpr->base.resSchema;
    char* val1 = elePtrAt(data[j], pSchema->bytes, genRows);
43 44 45

    assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
    SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
46
    assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type);
47 48 49
  }
}

50
static void setNullRow(SSDataBlock* pBlock, int32_t numOfCol, int32_t rowIndex) {
51
  // the first are always the timestamp column, so start from the second column.
52 53 54
  for (int32_t i = 1; i < pBlock->info.numOfCols; ++i) {
    SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
    colDataAppendNULL(p, rowIndex);
55 56 57
  }
}

58 59 60 61
#define GET_DEST_SLOT_ID(_p)  ((_p)->pExpr->base.resSchema.slotId)
#define GET_SRC_SLOT_ID(_p)  ((_p)->pExpr->base.pParam[0].pCol->slotId)

static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
62

63
static void doFillOneRowResult(SFillInfo* pFillInfo, SSDataBlock *pBlock, SSDataBlock* pSrcBlock, int64_t ts, bool outOfBound) {
64 65 66 67 68
  SPoint point1, point2, point;
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);

  // set the primary timestamp column value
  int32_t index = pFillInfo->numOfCurrent;
69 70 71
  SColumnInfoData *pCol0 = taosArrayGet(pBlock->pDataBlock, 0);
  char* val = colDataGetData(pCol0, index);
  
72 73 74 75
  *(TSKEY*) val = pFillInfo->currentKey;

  // set the other values
  if (pFillInfo->type == TSDB_FILL_PREV) {
76
    SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
77

78 79 80 81
    for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
      SFillColInfo* pCol = &pFillInfo->pFillCol[i];
      if (TSDB_COL_IS_TAG(pCol->flag)) {
        continue;
82
      }
83 84 85 86

      SGroupKeys* pKey = taosArrayGet(p, i);
      SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
      doSetVal(pDstColInfoData, index, pKey);
87 88
    }
  } else if (pFillInfo->type == TSDB_FILL_NEXT) {
89
    SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
90

91 92 93 94
    for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
      SFillColInfo* pCol = &pFillInfo->pFillCol[i];
      if (TSDB_COL_IS_TAG(pCol->flag)) {
        continue;
95
      }
96 97 98 99

      SGroupKeys* pKey = taosArrayGet(p, i);
      SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
      doSetVal(pDstColInfoData, index, pKey);
100 101 102
    }
  } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
    // TODO : linear interpolation supports NULL value
103 104 105
    if (outOfBound) {
      setNullRow(pBlock, pFillInfo->numOfCols, index);
    } else {
106 107 108 109 110 111
      for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
        SFillColInfo* pCol = &pFillInfo->pFillCol[i];
        if (TSDB_COL_IS_TAG(pCol->flag)) {
          continue;
        }

112 113 114 115
        int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);

        int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
        SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
116

117 118 119 120
        int16_t type = pCol->pExpr->base.resSchema.type;
        SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
        if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
          colDataAppendNULL(pDstCol, index);
121 122 123
          continue;
        }

124 125 126 127 128 129 130 131 132 133 134
        SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, 0);
        int64_t prevTs = *(int64_t*)pKey1->pData;

        SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
        char* data = colDataGetData(pSrcCol, pFillInfo->index);

        point1 = (SPoint){.key = prevTs, .val = pKey->pData};
        point2 = (SPoint){.key = ts, .val = data};

        int64_t out = 0;
        point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
135
        taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
136 137

        colDataAppend(pDstCol, index, (const char*)&out, false);
138 139
      }
    }
140 141 142
  } else if (pFillInfo->type == TSDB_FILL_NULL) {  // fill with NULL
    setNullRow(pBlock, pFillInfo->numOfCols, index);
  } else { // fill with user specified value for each column
143 144
    for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
      SFillColInfo* pCol = &pFillInfo->pFillCol[i];
145
      if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
146 147 148
        continue;
      }

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
      SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;

      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
      if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
        float v = 0;
        GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
        colDataAppend(pDst, index, (char*)&v, false);
      } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
        double v = 0;
        GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
        colDataAppend(pDst, index, (char*)&v, false);
      } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
        int64_t v = 0;
        GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
        colDataAppend(pDst, index, (char*)&v, false);
      }
165 166 167
    }
  }

168 169 170
//  setTagsValue(pFillInfo, data, index);
  SInterval* pInterval = &pFillInfo->interval;
  pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
171 172 173
  pFillInfo->numOfCurrent++;
}

174 175 176 177 178 179 180 181 182 183
void doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey) {
  if (pKey->isNull) {
    colDataAppendNULL(pDstCol, rowIndex);
  } else {
    colDataAppend(pDstCol, rowIndex, pKey->pData, false);
  }
}

static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
  if (taosArrayGetSize(pFillInfo->next) > 0) {
184 185 186
    return;
  }

187
  for (int i = 0; i < pFillInfo->numOfCols; i++) {
188
    SFillColInfo* pCol = &pFillInfo->pFillCol[i];
189 190 191 192 193 194 195 196 197 198 199 200

    SGroupKeys key = {0};
    SResSchema* pSchema = &pCol->pExpr->base.resSchema;
    key.pData  = taosMemoryMalloc(pSchema->bytes);
    key.isNull = true;
    key.bytes = pSchema->bytes;
    key.type  = pSchema->type;

    taosArrayPush(pFillInfo->next, &key);

    key.pData = taosMemoryMalloc(pSchema->bytes);
    taosArrayPush(pFillInfo->prev, &key);
201 202 203
  }
}

204 205 206
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);

static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) {
207
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
208 209 210 211 212 213 214
    int32_t srcSlotId = GET_SRC_SLOT_ID(&pFillInfo->pFillCol[i]);

    SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);

    bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
    char* p = colDataGetData(pSrcCol, rowIndex);
    saveColData(pRow, i, p, isNull);
215 216 217
  }
}

218
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
219 220
  pFillInfo->numOfCurrent = 0;

221 222
  // todo make sure the first column is always the primary timestamp column?
  SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
223 224

  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
225
  bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
226

227 228 229
#if 0
  ASSERT(ascFill && (pFillInfo->currentKey >= pFillInfo->start) || (!ascFill && (pFillInfo->currentKey <= pFillInfo->start)));
#endif
230 231

  while (pFillInfo->numOfCurrent < outputRows) {
232
    int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
233 234

    // set the next value for interpolation
235 236
    if ((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) {
      copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
237 238
    }

239 240 241 242
    if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) {
      // fill the gap between two input rows
      while (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) && pFillInfo->numOfCurrent < outputRows) {
        doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, ts, false);
243 244 245 246 247 248 249 250 251
      }

      // output buffer is full, abort
      if (pFillInfo->numOfCurrent == outputRows) {
        pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
        return outputRows;
      }
    } else {
      assert(pFillInfo->currentKey == ts);
252

253 254
      if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
        ++pFillInfo->index;
255
        copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next);
256 257 258 259 260 261
        --pFillInfo->index;
      }

      // assign rows to dst buffer
      for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
        SFillColInfo* pCol = &pFillInfo->pFillCol[i];
262
        if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
263 264 265
          continue;
        }

266 267
        int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
        int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
268

269 270 271 272 273 274 275 276 277
        SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId);
        SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);

        char* src = colDataGetData(pSrc, pFillInfo->index);
        if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*||
            (pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) {
          bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
          colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
          saveColData(pFillInfo->prev, i, src, isNull);
278 279
        } else {  // i > 0 and data is null , do interpolation
          if (pFillInfo->type == TSDB_FILL_PREV) {
280 281
            SGroupKeys *pKey = taosArrayGet(pFillInfo->prev, i);
            doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
282
          } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
283 284 285 286 287
            bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
            colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull);
            saveColData(pFillInfo->prev, i, src, isNull);
          } else if (pFillInfo->type == TSDB_FILL_NULL) {
            colDataAppendNULL(pDst, pFillInfo->numOfCurrent);
288
          } else if (pFillInfo->type == TSDB_FILL_NEXT) {
289 290
            SGroupKeys *pKey = taosArrayGet(pFillInfo->next, i);
            doSetVal(pDst, pFillInfo->numOfCurrent, pKey);
291
          } else {
292 293
            SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
            colDataAppend(pDst, pFillInfo->numOfCurrent, (char*)&pVar->i, false);
294 295 296 297 298
          }
        }
      }

      // set the tag value for final result
299 300 301
//      setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
      SInterval *pInterval = &pFillInfo->interval;
      pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
302 303 304 305 306 307 308

      pFillInfo->index += 1;
      pFillInfo->numOfCurrent += 1;
    }

    if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) {
      /* the raw data block is exhausted, next value does not exists */
309 310 311
//      if (pFillInfo->index >= pFillInfo->numOfRows) {
//        taosMemoryFreeClear(*next);
//      }
312 313 314 315 316 317 318 319
      pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
      return pFillInfo->numOfCurrent;
    }
  }

  return pFillInfo->numOfCurrent;
}

320 321 322 323 324 325 326 327 328 329 330
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull) {
  SGroupKeys *pKey = taosArrayGet(rowBuf, columnIndex);
  if (isNull) {
    pKey->isNull = true;
  } else {
    memcpy(pKey->pData, src, pKey->bytes);
    pKey->isNull = false;
  }
}

static int64_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
331 332 333 334 335 336
  /*
   * These data are generated according to fill strategy, since the current timestamp is out of the time window of
   * real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
   */
  pFillInfo->numOfCurrent = 0;
  while (pFillInfo->numOfCurrent < resultCapacity) {
337
    doFillOneRowResult(pFillInfo, pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
  }

  pFillInfo->numOfTotal += pFillInfo->numOfCurrent;

  assert(pFillInfo->numOfCurrent == resultCapacity);
  return resultCapacity;
}

// there are no duplicated tags in the SFillTagColInfo list
static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t capacity) {
  int32_t rowsize = 0;
  int32_t numOfTags = 0;

  int32_t k = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
354
    SResSchema* pSchema = &pColInfo->pExpr->base.resSchema;
355

356
    if (TSDB_COL_IS_TAG(pColInfo->flag) || pSchema->type == TSDB_DATA_TYPE_BINARY) {
357 358 359 360 361
      numOfTags += 1;

      bool exists = false;
      int32_t index = -1;
      for (int32_t j = 0; j < k; ++j) {
362
        if (pFillInfo->pTags[j].col.colId == pSchema->slotId) {
363 364 365 366 367 368 369
          exists = true;
          index = j;
          break;
        }
      }

      if (!exists) {
370 371 372 373
        SSchema* pSchema1 = &pFillInfo->pTags[k].col;
        pSchema1->colId = pSchema->slotId;
        pSchema1->type  = pSchema->type;
        pSchema1->bytes = pSchema->bytes;
374

375
        pFillInfo->pTags[k].tagVal = taosMemoryCalloc(1, pSchema->bytes);
376 377 378 379 380 381 382 383
        pColInfo->tagIndex = k;

        k += 1;
      } else {
        pColInfo->tagIndex = index;
      }
    }

384
    rowsize += pSchema->bytes;
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
  }

  pFillInfo->numOfTags = numOfTags;

  assert(k <= pFillInfo->numOfTags);
  return rowsize;
}

static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
  if (pFillInfo->numOfRows == 0 || (pFillInfo->numOfRows > 0 && pFillInfo->index >= pFillInfo->numOfRows)) {
    return 0;
  }

  return pFillInfo->numOfRows - pFillInfo->index;
}

401
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
402
                            SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, const char* id) {
403 404 405 406
  if (fillType == TSDB_FILL_NONE) {
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
407
  SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
408 409 410 411 412
  if (pFillInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

413
  taosResetFillInfo(pFillInfo, skey);
414 415 416 417 418 419 420 421
  pFillInfo->order = order;

  switch(fillType) {
    case FILL_MODE_NONE:   pFillInfo->type = TSDB_FILL_NONE;  break;
    case FILL_MODE_PREV:   pFillInfo->type = TSDB_FILL_PREV;  break;
    case FILL_MODE_NULL:   pFillInfo->type = TSDB_FILL_NULL;  break;
    case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break;
    case FILL_MODE_NEXT:   pFillInfo->type = TSDB_FILL_NEXT;  break;
422
    case FILL_MODE_VALUE:  pFillInfo->type = TSDB_FILL_SET_VALUE; break;
423 424 425 426 427
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      return NULL;
  }

428 429 430 431 432
  pFillInfo->type      = fillType;
  pFillInfo->pFillCol  = pCol;
  pFillInfo->numOfTags = numOfTags;
  pFillInfo->numOfCols = numOfCols;
  pFillInfo->alloc     = capacity;
H
Haojun Liao 已提交
433
  pFillInfo->id        = id;
434
  pFillInfo->interval  = *pInterval;
435 436

//  if (numOfTags > 0) {
wafwerar's avatar
wafwerar 已提交
437
    pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo));
438 439 440 441 442
    for (int32_t i = 0; i < numOfCols; ++i) {
      pFillInfo->pTags[i].col.colId = -2;  // TODO
    }
//  }

443 444 445 446 447
  pFillInfo->next = taosArrayInit(numOfCols, sizeof(SGroupKeys));
  pFillInfo->prev = taosArrayInit(numOfCols, sizeof(SGroupKeys));

  initBeforeAfterDataBuf(pFillInfo);

448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
  pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
  assert(pFillInfo->rowSize > 0);
  return pFillInfo;
}

void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
  pFillInfo->start        = startTimestamp;
  pFillInfo->currentKey   = startTimestamp;
  pFillInfo->end          = startTimestamp;
  pFillInfo->index        = -1;
  pFillInfo->numOfRows    = 0;
  pFillInfo->numOfCurrent = 0;
  pFillInfo->numOfTotal   = 0;
}

void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
  if (pFillInfo == NULL) {
    return NULL;
  }

468 469
  taosArrayDestroy(pFillInfo->prev);
  taosArrayDestroy(pFillInfo->next);
470 471

  for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
wafwerar's avatar
wafwerar 已提交
472
    taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
473 474
  }

wafwerar's avatar
wafwerar 已提交
475 476 477
  taosMemoryFreeClear(pFillInfo->pTags);
  taosMemoryFreeClear(pFillInfo->pFillCol);
  taosMemoryFreeClear(pFillInfo);
478 479 480 481 482 483 484 485 486 487
  return NULL;
}

void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
  if (pFillInfo->type == TSDB_FILL_NONE) {
    return;
  }

  pFillInfo->end = endKey;
  if (!FILL_IS_ASC_FILL(pFillInfo)) {
488
    pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->interval.precision);
489 490 491 492 493 494 495
  }

  pFillInfo->index     = 0;
  pFillInfo->numOfRows = numOfRows;
}

void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
496
  pFillInfo->pSrcBlock = (SSDataBlock*) pInput;
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
}

bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
  int32_t remain = taosNumOfRemainRows(pFillInfo);
  if (remain > 0) {
    return true;
  }

  if (pFillInfo->numOfTotal > 0 && (((pFillInfo->end > pFillInfo->start) && FILL_IS_ASC_FILL(pFillInfo)) ||
                                    (pFillInfo->end < pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo)))) {
    return getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, 4096) > 0;
  }

  return false;
}

int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
514
  SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
515

516
  int64_t* tsList = (int64_t*) pCol->pData;
517 518 519 520
  int32_t numOfRows = taosNumOfRemainRows(pFillInfo);

  TSKEY ekey1 = ekey;
  if (!FILL_IS_ASC_FILL(pFillInfo)) {
521
    pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->interval.precision);
522 523 524 525 526 527 528 529 530 531
  }

  int64_t numOfRes = -1;
  if (numOfRows > 0) {  // still fill gap within current data block, not generating data after the result set.
    TSKEY lastKey = tsList[pFillInfo->numOfRows - 1];
    numOfRes = taosTimeCountInterval(
      lastKey,
      pFillInfo->currentKey,
      pFillInfo->interval.sliding,
      pFillInfo->interval.slidingUnit,
532
      pFillInfo->interval.precision);
533 534 535 536 537 538 539 540 541 542 543 544
    numOfRes += 1;
    assert(numOfRes >= numOfRows);
  } else { // reach the end of data
    if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
        (ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
      return 0;
    }
    numOfRes = taosTimeCountInterval(
      ekey1,
      pFillInfo->currentKey,
      pFillInfo->interval.sliding,
      pFillInfo->interval.slidingUnit,
545
      pFillInfo->interval.precision);
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
    numOfRes += 1;
  }

  return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
}

int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType) {
  double v1 = -1, v2 = -1;
  GET_TYPED_DATA(v1, double, inputType, point1->val);
  GET_TYPED_DATA(v2, double, inputType, point2->val);

  double r = DO_INTERPOLATION(v1, v2, point1->key, point2->key, point->key);
  SET_TYPED_DATA(point->val, outputType, r);

  return TSDB_CODE_SUCCESS;
}

563
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
564 565 566 567 568 569 570
  int32_t remain = taosNumOfRemainRows(pFillInfo);

  int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
  assert(numOfRes <= capacity);

  // no data existed for fill operation now, append result according to the fill strategy
  if (remain == 0) {
571
    appendFilledResult(pFillInfo, p, numOfRes);
572
  } else {
573
    fillResultImpl(pFillInfo, p, (int32_t) numOfRes);
574 575 576 577 578 579 580 581 582
    assert(numOfRes == pFillInfo->numOfCurrent);
  }

//  qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%"PRId64"-%"PRId64", currentKey:%"PRId64", current:%d, total:%d, %p",
//      pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey, pFillInfo->numOfCurrent,
//         pFillInfo->numOfTotal, pFillInfo->handle);

  return numOfRes;
}
583 584 585 586 587

int64_t getFillInfoStart(struct SFillInfo *pFillInfo) {
  return pFillInfo->start;
}

588 589
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* pValNode) {
  SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
590 591 592 593
  if (pFillCol == NULL) {
    return NULL;
  }

594
  size_t len = (pValNode != NULL)? LIST_LENGTH(pValNode->pNodeList):0;
595
  for(int32_t i = 0; i < numOfOutput; ++i) {
596 597 598
    SExprInfo* pExprInfo = &pExpr[i];
    pFillCol[i].pExpr = pExprInfo;
    pFillCol[i].tagIndex = -2;
599

600 601 602 603 604 605 606 607
    // todo refactor
    if (len > 0) {
      // if the user specified value is less than the column, alway use the last one as the fill value
      int32_t index = (i >= len)? (len - 1):i;

      SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
      valueNodeToVariant(pv, &pFillCol[i].fillVal);
    }
608 609 610 611

    if (pExprInfo->base.numOfParams > 0) {
      pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag;    // always be the normal column for table query
    }
612 613 614 615
  }

  return pFillCol;
}