builtinsimpl.c 46.9 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "builtinsimpl.h"
17
#include <libs/nodes/querynodes.h>
18
#include "querynodes.h"
H
Haojun Liao 已提交
19 20
#include "taggfunction.h"
#include "tdatablock.h"
21
#include "tpercentile.h"
H
Haojun Liao 已提交
22

G
Ganlin Zhao 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36
typedef struct SSumRes {
  union {
    int64_t  isum;
    uint64_t usum;
    double   dsum;
  };
} SSumRes;

typedef struct SAvgRes {
  double  result;
  SSumRes sum;
  int64_t count;
} SAvgRes;

37 38 39 40 41 42 43 44 45
typedef struct STopBotResItem {
  SVariant v;
  uint64_t uid;        // it is a table uid, used to extract tag data during building of the final result for the tag data
  struct {
   int32_t pageId;
   int32_t offset;
  } tuplePos;          // tuple data of this chosen row
} STopBotResItem;

G
Ganlin Zhao 已提交
46
typedef struct STopBotRes {
47 48
  int32_t         pageId;
//  int32_t   num;
49
  STopBotResItem *pItems;
G
Ganlin Zhao 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
} STopBotRes;

typedef struct SStddevRes {
  double  result;
  int64_t count;
  union  {double  quadraticDSum; int64_t quadraticISum;};
  union  {double  dsum; int64_t isum;};
} SStddevRes;

typedef struct SPercentileInfo {
  double      result;
  tMemBucket *pMemBucket;
  int32_t     stage;
  double      minval;
  double      maxval;
  int64_t     numOfElems;
} SPercentileInfo;

typedef struct SDiffInfo {
  bool  hasPrev;
  bool  includeNull;
  bool  ignoreNegative;
  bool  firstOutput;
  union { int64_t i64; double d64;} prev;
} SDiffInfo;

H
Haojun Liao 已提交
76 77 78 79 80 81 82 83
#define SET_VAL(_info, numOfElem, res)  \
  do {                                  \
    if ((numOfElem) <= 0) {             \
      break;                            \
    }                                   \
    (_info)->numOfRes = (res);          \
  } while (0)

G
Ganlin Zhao 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
#define GET_TS_LIST(x)    ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])

#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx)                      \
  do {                                                             \
    for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
      SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i];      \
      __ctx->fpSet.process(__ctx);                                 \
    }                                                              \
  } while (0);

#define DO_UPDATE_SUBSID_RES(ctx, ts)                                 \
  do {                                                                \
    for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
      SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i];          \
      if (__ctx->functionId == FUNCTION_TS_DUMMY) {                   \
        __ctx->tag.i = (ts);                                          \
        __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;                     \
      }                                                               \
      __ctx->fpSet.process(__ctx);                                    \
    }                                                                 \
  } while (0)

#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
  do {                                                \
    if (((left) < (right)) ^ (sign)) {                \
      (left) = (right);                               \
      DO_UPDATE_SUBSID_RES(ctx, _ts);                 \
      (num) += 1;                                     \
    }                                                 \
  } while (0)

#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num)        \
  do {                                                                   \
    _t *d = (_t *)((_col)->pData);                                       \
    for (int32_t i = (_start); i < (_nrow) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      }                                                                  \
      TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0;       \
      UPDATE_DATA(ctx, val, d[i], num, sign, ts);                        \
    }                                                                    \
  } while (0)

H
Haojun Liao 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141

bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
  if (pResultInfo->initialized) {
    return false;
  }

  if (pCtx->pOutput != NULL) {
    memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes);
  }

  initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize);
  return true;
}

142 143
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
144

145
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
146
  pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0;
147 148 149 150 151 152
  cleanupResultRowEntry(pResInfo);

  char* in = GET_ROWCELL_INTERBUF(pResInfo);
  colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);

  return pResInfo->numOfRes;
H
Haojun Liao 已提交
153 154
}

155 156 157
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
  SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
  if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
158
    return FUNC_DATA_REQUIRED_NOT_LOAD;
159
  }
160
  return FUNC_DATA_REQUIRED_STATIS_LOAD;
161
}
H
Haojun Liao 已提交
162 163 164 165 166 167 168 169 170 171

bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

/*
 * count function does need the finalize, if data is missing, the default value, which is 0, is used
 * count function does not use the pCtx->interResBuf to keep the intermediate buffer
 */
H
Haojun Liao 已提交
172
int32_t countFunction(SqlFunctionCtx *pCtx) {
H
Haojun Liao 已提交
173 174 175
  int32_t numOfElem = 0;

  /*
H
Haojun Liao 已提交
176 177 178
   * 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true;
   * 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true;
   * 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false;
H
Haojun Liao 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
   */
  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnInfoData* pInputCol = pInput->pData[0];
  if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) {
    numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
    ASSERT(numOfElem >= 0);
  } else {
    if (pInputCol->hasNull) {
      for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
        if (colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
          continue;
        }
        numOfElem += 1;
      }
    } else {
      //when counting on the primary time stamp column and no statistics data is presented, use the size value directly.
      numOfElem = pInput->numOfRows;
    }
  }

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  char* buf = GET_ROWCELL_INTERBUF(pResInfo);
  *((int64_t *)buf) += numOfElem;

  SET_VAL(pResInfo, numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
204
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218
}

#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem)             \
  do {                                                                   \
    _t *d = (_t *)(_col->pData);                                         \
    for (int32_t i = (_start); i < (_rows) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      };                                                                 \
      (_res) += (d)[i];                                                  \
      (numOfElem)++;                                                     \
    }                                                                    \
  } while (0)

H
Haojun Liao 已提交
219
int32_t sumFunction(SqlFunctionCtx *pCtx) {
H
Haojun Liao 已提交
220 221 222 223 224 225 226
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
  int32_t type = pInput->pData[0]->info.type;

227
  SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
G
Ganlin Zhao 已提交
228

H
Haojun Liao 已提交
229 230 231 232 233
  if (pInput->colDataAggIsSet) {
    numOfElem = pInput->numOfRows - pAgg->numOfNull;
    ASSERT(numOfElem >= 0);

    if (IS_SIGNED_NUMERIC_TYPE(type)) {
234
      pSumRes->isum += pAgg->sum;
H
Haojun Liao 已提交
235
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
236
      pSumRes->usum += pAgg->sum;
H
Haojun Liao 已提交
237
    } else if (IS_FLOAT_TYPE(type)) {
238
      pSumRes->dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
H
Haojun Liao 已提交
239 240 241 242 243 244 245
    }
  } else {  // computing based on the true data block
    SColumnInfoData* pCol = pInput->pData[0];

    int32_t start     = pInput->startRowIndex;
    int32_t numOfRows = pInput->numOfRows;

246 247
    if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
      if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
248 249 250 251 252 253 254
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int8_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_SMALLINT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int16_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_INT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int32_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_BIGINT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int64_t, numOfElem);
H
Haojun Liao 已提交
255
      }
256 257 258 259 260 261 262 263 264
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
      if (type == TSDB_DATA_TYPE_UTINYINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint8_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_USMALLINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint16_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_UINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint32_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_UBIGINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint64_t, numOfElem);
H
Haojun Liao 已提交
265
      }
266 267 268 269
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
      LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, double, numOfElem);
    } else if (type == TSDB_DATA_TYPE_FLOAT) {
      LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, float, numOfElem);
H
Haojun Liao 已提交
270 271 272 273 274
    }
  }

  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
275
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
276 277
}

H
Haojun Liao 已提交
278
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
H
Haojun Liao 已提交
279 280 281 282
  pEnv->calcMemSize = sizeof(SSumRes);
  return true;
}

G
Ganlin Zhao 已提交
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(double);
  return true;
}

bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
  memset(pRes, 0, sizeof(SAvgRes));
  return true;
}

int32_t avgFunction(SqlFunctionCtx* pCtx) {
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t               type = pInput->pData[0]->info.type;

  SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

  // computing based on the true data block
  SColumnInfoData* pCol = pInput->pData[0];

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

  switch (type) {
    case TSDB_DATA_TYPE_TINYINT: {
        int8_t* plist = (int8_t*)pCol->pData;
        for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
          if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
            continue;
          }

          numOfElem += 1;
          pAvgRes->count += 1;
          pAvgRes->sum.isum += plist[i];
        }

        break;
      }

      case TSDB_DATA_TYPE_SMALLINT: {
      int16_t* plist = (int16_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      int32_t* plist = (int32_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }

      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      int64_t* plist = (int64_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      float* plist = (float*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.dsum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double* plist = (double*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.dsum += plist[i];
      }
      break;
    }

    default:
      break;
  }

  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
  return TSDB_CODE_SUCCESS;
}

409
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
G
Ganlin Zhao 已提交
410 411 412 413 414 415 416 417
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t type = pInput->pData[0]->info.type;
  SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  if (IS_INTEGER_TYPE(type)) {
    pAvgRes->result = pAvgRes->sum.isum / ((double) pAvgRes->count);
  } else {
    pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count);
  }
418 419

  return functionFinalize(pCtx, pBlock, slotId);
G
Ganlin Zhao 已提交
420 421
}

422 423 424 425
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
  return FUNC_DATA_REQUIRED_STATIS_LOAD;
}

426 427 428 429 430 431
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
432
  switch (pCtx->resDataInfo.type) {
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
    case TSDB_DATA_TYPE_INT:
      *((int32_t *)buf) = INT32_MIN;
      break;
    case TSDB_DATA_TYPE_UINT:
      *((uint32_t *)buf) = 0;
      break;
    case TSDB_DATA_TYPE_FLOAT:
      *((float *)buf) = -FLT_MAX;
      break;
    case TSDB_DATA_TYPE_DOUBLE:
    SET_DOUBLE_VAL(((double *)buf), -DBL_MAX);
      break;
    case TSDB_DATA_TYPE_BIGINT:
      *((int64_t *)buf) = INT64_MIN;
      break;
    case TSDB_DATA_TYPE_UBIGINT:
      *((uint64_t *)buf) = 0;
      break;
    case TSDB_DATA_TYPE_SMALLINT:
      *((int16_t *)buf) = INT16_MIN;
      break;
    case TSDB_DATA_TYPE_USMALLINT:
      *((uint16_t *)buf) = 0;
      break;
    case TSDB_DATA_TYPE_TINYINT:
      *((int8_t *)buf) = INT8_MIN;
      break;
    case TSDB_DATA_TYPE_UTINYINT:
      *((uint8_t *)buf) = 0;
      break;
463 464 465
    case TSDB_DATA_TYPE_BOOL:
      *((int8_t*)buf) = 0;
      break;
466 467 468 469 470 471 472 473 474 475 476 477
    default:
      assert(0);
  }
  return true;
}

bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;  // not initialized since it has been initialized
  }

  char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
478
  switch (pCtx->resDataInfo.type) {
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
    case TSDB_DATA_TYPE_TINYINT:
      *((int8_t *)buf) = INT8_MAX;
      break;
    case TSDB_DATA_TYPE_UTINYINT:
      *(uint8_t *) buf = UINT8_MAX;
      break;
    case TSDB_DATA_TYPE_SMALLINT:
      *((int16_t *)buf) = INT16_MAX;
      break;
    case TSDB_DATA_TYPE_USMALLINT:
      *((uint16_t *)buf) = UINT16_MAX;
      break;
    case TSDB_DATA_TYPE_INT:
      *((int32_t *)buf) = INT32_MAX;
      break;
    case TSDB_DATA_TYPE_UINT:
      *((uint32_t *)buf) = UINT32_MAX;
      break;
    case TSDB_DATA_TYPE_BIGINT:
      *((int64_t *)buf) = INT64_MAX;
      break;
    case TSDB_DATA_TYPE_UBIGINT:
      *((uint64_t *)buf) = UINT64_MAX;
      break;
    case TSDB_DATA_TYPE_FLOAT:
      *((float *)buf) = FLT_MAX;
      break;
    case TSDB_DATA_TYPE_DOUBLE:
      SET_DOUBLE_VAL(((double *)buf), DBL_MAX);
      break;
509 510 511
    case TSDB_DATA_TYPE_BOOL:
      *((int8_t*)buf) = 1;
      break;
512 513 514 515 516 517 518
    default:
      assert(0);
  }

  return true;
}

H
Haojun Liao 已提交
519
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}


int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
  int32_t numOfElems = 0;

  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];

  SColumnInfoData* pCol = pInput->pData[0];
  int32_t type = pCol->info.type;

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  char* buf = GET_ROWCELL_INTERBUF(pResInfo);

  // data in current data block are qualified to the query
  if (pInput->colDataAggIsSet) {
    numOfElems = pInput->numOfRows - pAgg->numOfNull;
    ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0);

    if (numOfElems == 0) {
      return numOfElems;
    }

    void*   tval = NULL;
    int16_t index = 0;

    if (isMinFunc) {
      tval  = &pInput->pColumnDataAgg[0]->min;
      index = pInput->pColumnDataAgg[0]->minIndex;
    } else {
      tval  = &pInput->pColumnDataAgg[0]->max;
      index = pInput->pColumnDataAgg[0]->maxIndex;
    }

557 558
    // the index is the original position, not the relative position
    TSKEY key = (pCtx->ptsList != NULL)? pCtx->ptsList[index]:TSKEY_INITIAL_VAL;
559 560

    if (IS_SIGNED_NUMERIC_TYPE(type)) {
561 562
      int64_t prev = 0;
      GET_TYPED_DATA(prev, int64_t, type, buf);
563

564 565
      int64_t val = GET_INT64_VAL(tval);
      if ((prev < val) ^ isMinFunc) {
566 567 568 569 570 571 572 573 574 575 576
        *(int64_t*) buf = val;
        for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
          SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
          if (__ctx->functionId == FUNCTION_TS_DUMMY) {  // TODO refactor
            __ctx->tag.i = key;
            __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
          }

          __ctx->fpSet.process(__ctx);
        }
      }
577
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
578 579 580
      uint64_t prev = 0;
      GET_TYPED_DATA(prev, uint64_t, type, buf);

581
      uint64_t val = GET_UINT64_VAL(tval);
H
Haojun Liao 已提交
582 583 584 585 586 587 588 589
      if ((prev < val) ^ isMinFunc) {
        *(uint64_t*) buf = val;
        for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) {
          SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i];
          if (__ctx->functionId == FUNCTION_TS_DUMMY) {  // TODO refactor
            __ctx->tag.i = key;
            __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
          }
590

H
Haojun Liao 已提交
591 592 593 594
          __ctx->fpSet.process(__ctx);
        }
      }
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
595
      double  val = GET_DOUBLE_VAL(tval);
H
Haojun Liao 已提交
596
      UPDATE_DATA(pCtx, *(double*) buf, val, numOfElems, isMinFunc, key);
597
    } else if (type == TSDB_DATA_TYPE_FLOAT) {
598
      double val = GET_DOUBLE_VAL(tval);
H
Haojun Liao 已提交
599
      UPDATE_DATA(pCtx, *(float*) buf, val, numOfElems, isMinFunc, key);
600 601 602 603 604 605 606 607
    }

    return numOfElems;
  }

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

608 609
  if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
    if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
610 611 612 613
      LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
    } else if (type == TSDB_DATA_TYPE_SMALLINT) {
      LOOPCHECK_N(*(int16_t*) buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
    } else if (type == TSDB_DATA_TYPE_INT) {
614
      int32_t *pData = (int32_t*)pCol->pData;
615
      int32_t *val = (int32_t*) buf;
616

H
Haojun Liao 已提交
617
      for (int32_t i = start; i < start + numOfRows; ++i) {
618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
        if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        if ((*val < pData[i]) ^ isMinFunc) {
          *val = pData[i];
          TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i) : 0;
          DO_UPDATE_SUBSID_RES(pCtx, ts);
        }

        numOfElems += 1;
      }

#if defined(_DEBUG_VIEW)
      qDebug("max value updated:%d", *retVal);
#endif
634
    } else if (type == TSDB_DATA_TYPE_BIGINT) {
635 636
      LOOPCHECK_N(*(int64_t*) buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
    }
637 638 639 640 641 642 643 644
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
    if (type == TSDB_DATA_TYPE_UTINYINT) {
      LOOPCHECK_N(*(uint8_t*) buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
    } else if (type == TSDB_DATA_TYPE_USMALLINT) {
      LOOPCHECK_N(*(uint16_t*) buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
    } else if (type == TSDB_DATA_TYPE_UINT) {
      LOOPCHECK_N(*(uint32_t*) buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
    } else if (type == TSDB_DATA_TYPE_UBIGINT) {
645 646
      LOOPCHECK_N(*(uint64_t*) buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
    }
647
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
648
    LOOPCHECK_N(*(double*) buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
649
  } else if (type == TSDB_DATA_TYPE_FLOAT) {
650 651 652 653
    LOOPCHECK_N(*(float*) buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
  }

  return numOfElems;
H
Haojun Liao 已提交
654
}
655

H
Haojun Liao 已提交
656
int32_t minFunction(SqlFunctionCtx *pCtx) {
657 658
  int32_t numOfElems = doMinMaxHelper(pCtx, 1);
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
659
  return TSDB_CODE_SUCCESS;
660 661
}

H
Haojun Liao 已提交
662
int32_t maxFunction(SqlFunctionCtx *pCtx) {
663 664
  int32_t numOfElems = doMinMaxHelper(pCtx, 0);
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
665
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
666 667 668 669 670 671 672
}

bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SStddevRes);
  return true;
}

H
Haojun Liao 已提交
673 674 675 676 677 678 679 680 681 682
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SStddevRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
  memset(pRes, 0, sizeof(SStddevRes));
  return true;
}

H
Haojun Liao 已提交
683
int32_t stddevFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
684 685 686 687
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
H
Haojun Liao 已提交
688
  int32_t               type = pInput->pData[0]->info.type;
H
Haojun Liao 已提交
689 690 691

  SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

H
Haojun Liao 已提交
692 693
  // computing based on the true data block
  SColumnInfoData* pCol = pInput->pData[0];
H
Haojun Liao 已提交
694

H
Haojun Liao 已提交
695 696
  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;
H
Haojun Liao 已提交
697

H
Haojun Liao 已提交
698 699 700
  switch (type) {
    case TSDB_DATA_TYPE_TINYINT: {
        int8_t* plist = (int8_t*)pCol->pData;
701
        for (int32_t i = start; i < numOfRows + start; ++i) {
H
Haojun Liao 已提交
702 703 704 705
          if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
            continue;
          }

H
Haojun Liao 已提交
706
          numOfElem += 1;
H
Haojun Liao 已提交
707
          pStddevRes->count += 1;
H
Haojun Liao 已提交
708
          pStddevRes->isum += plist[i];
H
Haojun Liao 已提交
709 710
          pStddevRes->quadraticISum += plist[i] * plist[i];
        }
H
Haojun Liao 已提交
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769

        break;
      }

      case TSDB_DATA_TYPE_SMALLINT: {
      int16_t* plist = (int16_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      int32_t* plist = (int32_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }

      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      int64_t* plist = (int64_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      float* plist = (float*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
G
Ganlin Zhao 已提交
770 771
        pStddevRes->dsum += plist[i];
        pStddevRes->quadraticDSum += plist[i] * plist[i];
H
Haojun Liao 已提交
772 773 774 775
      }
      break;
    }

H
Haojun Liao 已提交
776 777 778 779 780 781 782 783 784
    case TSDB_DATA_TYPE_DOUBLE: {
      double* plist = (double*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
G
Ganlin Zhao 已提交
785 786
        pStddevRes->dsum += plist[i];
        pStddevRes->quadraticDSum += plist[i] * plist[i];
H
Haojun Liao 已提交
787 788 789 790 791 792 793 794
      }
      break;
    }

    default:
      break;
  }

H
Haojun Liao 已提交
795 796
  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
797
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
798 799
}

800
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
G
Ganlin Zhao 已提交
801 802
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t type = pInput->pData[0]->info.type;
H
Haojun Liao 已提交
803
  SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
G
Ganlin Zhao 已提交
804 805 806 807 808 809 810 811
  double avg;
  if (IS_INTEGER_TYPE(type)) {
    avg = pStddevRes->isum / ((double) pStddevRes->count);
    pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
  } else {
    avg = pStddevRes->dsum / ((double) pStddevRes->count);
    pStddevRes->result = sqrt(pStddevRes->quadraticDSum/((double)pStddevRes->count) - avg*avg);
  }
812

813
  return functionFinalize(pCtx, pBlock, slotId);
H
Haojun Liao 已提交
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832
}

bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SPercentileInfo);
  return true;
}

bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  // in the first round, get the min-max value of all involved data
  SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
  SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX);
  SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX);
  pInfo->numOfElems = 0;

  return true;
H
Haojun Liao 已提交
833 834
}

H
Haojun Liao 已提交
835
int32_t percentileFunction(SqlFunctionCtx *pCtx) {
H
Haojun Liao 已提交
836
  int32_t notNullElems = 0;
837 838 839 840
  SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);

  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
H
Haojun Liao 已提交
841

842 843 844 845
  SColumnInfoData *pCol = pInput->pData[0];
  int32_t type = pCol->info.type;

  SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
846 847
  if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
    pInfo->stage += 1;
H
Haojun Liao 已提交
848

H
Haojun Liao 已提交
849 850 851
    // all data are null, set it completed
    if (pInfo->numOfElems == 0) {
      pResInfo->complete = true;
H
Haojun Liao 已提交
852
      return 0;
H
Haojun Liao 已提交
853 854 855 856 857 858 859
    } else {
      pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
    }
  }

  // the first stage, only acquire the min/max value
  if (pInfo->stage == 0) {
860
    if (pCtx->input.colDataAggIsSet) {
H
Haojun Liao 已提交
861
      double tmin = 0.0, tmax = 0.0;
862 863 864 865 866 867 868 869 870
      if (IS_SIGNED_NUMERIC_TYPE(type)) {
        tmin = (double)GET_INT64_VAL(&pAgg->min);
        tmax = (double)GET_INT64_VAL(&pAgg->max);
      } else if (IS_FLOAT_TYPE(type)) {
        tmin = GET_DOUBLE_VAL(&pAgg->min);
        tmax = GET_DOUBLE_VAL(&pAgg->max);
      } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
        tmin = (double)GET_UINT64_VAL(&pAgg->min);
        tmax = (double)GET_UINT64_VAL(&pAgg->max);
H
Haojun Liao 已提交
871 872 873 874 875 876 877 878 879 880
      }

      if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
        SET_DOUBLE_VAL(&pInfo->minval, tmin);
      }

      if (GET_DOUBLE_VAL(&pInfo->maxval) < tmax) {
        SET_DOUBLE_VAL(&pInfo->maxval, tmax);
      }

881
      pInfo->numOfElems += (pInput->numOfRows - pAgg->numOfNull);
H
Haojun Liao 已提交
882
    } else {
883 884 885 886
      // check the valid data one by one
      int32_t start = pInput->startRowIndex;
      for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
H
Haojun Liao 已提交
887 888 889
          continue;
        }

890 891
        char *data = colDataGetData(pCol, i);

H
Haojun Liao 已提交
892 893 894 895 896 897 898 899 900 901 902 903 904 905
        double v = 0;
        GET_TYPED_DATA(v, double, pCtx->inputType, data);
        if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
          SET_DOUBLE_VAL(&pInfo->minval, v);
        }

        if (v > GET_DOUBLE_VAL(&pInfo->maxval)) {
          SET_DOUBLE_VAL(&pInfo->maxval, v);
        }

        pInfo->numOfElems += 1;
      }
    }

H
Haojun Liao 已提交
906
    return 0;
H
Haojun Liao 已提交
907 908 909
  }

  // the second stage, calculate the true percentile value
910 911 912
  int32_t start = pInput->startRowIndex;
  for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
    if (colDataIsNull_f(pCol->nullbitmap, i)) {
H
Haojun Liao 已提交
913 914 915
      continue;
    }

916 917
    char *data = colDataGetData(pCol, i);

H
Haojun Liao 已提交
918 919 920 921
    notNullElems += 1;
    tMemBucketPut(pInfo->pMemBucket, data, 1);
  }

922
  SET_VAL(pResInfo, notNullElems, 1);
wmmhello's avatar
wmmhello 已提交
923
  return TSDB_CODE_SUCCESS;
924 925
}

926
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
927 928
  SVariant* pVal = &pCtx->param[1].param;
  double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
H
Haojun Liao 已提交
929

930 931 932 933 934 935 936 937 938
  SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
  SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo);

  tMemBucket * pMemBucket = ppInfo->pMemBucket;
  if (pMemBucket != NULL && pMemBucket->total > 0) {  // check for null
    SET_DOUBLE_VAL(&ppInfo->result, getPercentile(pMemBucket, v));
  }

  tMemBucketDestroy(pMemBucket);
939
  return functionFinalize(pCtx, pBlock, slotId);
H
Haojun Liao 已提交
940
}
H
Haojun Liao 已提交
941

H
Haojun Liao 已提交
942 943
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
944
  pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
H
Haojun Liao 已提交
945 946 947
  return true;
}

948 949 950 951 952 953 954 955
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
  if (pTsColInfo == NULL) {
    return 0;
  }

  return *(TSKEY*) colDataGetData(pTsColInfo, rowIndex);
}

956 957
// This ordinary first function does not care if current scan is ascending order or descending order scan
// the OPTIMIZED version of first function will only handle the ascending order scan
H
Haojun Liao 已提交
958
int32_t firstFunction(SqlFunctionCtx *pCtx) {
H
Haojun Liao 已提交
959 960
  int32_t numOfElems = 0;

H
Haojun Liao 已提交
961
  SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
H
Haojun Liao 已提交
962 963 964 965 966
  char* buf = GET_ROWCELL_INTERBUF(pResInfo);

  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnInfoData* pInputCol = pInput->pData[0];

967 968
  int32_t bytes = pInputCol->info.bytes;

H
Haojun Liao 已提交
969
  // All null data column, return directly.
H
Haojun Liao 已提交
970
  if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
H
Haojun Liao 已提交
971
    ASSERT(pInputCol->hasNull == true);
H
Haojun Liao 已提交
972
    return 0;
H
Haojun Liao 已提交
973 974
  }

975 976
  SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL;

977 978
  TSKEY startKey = getRowPTs(pInput->pPTS, 0);
  TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995

  int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;

  if (blockDataOrder == TSDB_ORDER_ASC) {
    // filter according to current result firstly
    if (pResInfo->numOfRes > 0) {
      TSKEY ts = *(TSKEY*)(buf + bytes);
      if (ts < startKey) {
        return TSDB_CODE_SUCCESS;
      }
    }

    for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
        continue;
      }

996 997
      numOfElems++;

998
      char* data = colDataGetData(pInputCol, i);
999
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1000

1001
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
1002 1003 1004
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
//        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
1005 1006

        pResInfo->numOfRes = 1;
1007
        break;
1008 1009 1010 1011 1012 1013 1014 1015 1016 1017
      }
    }
  } else {
    // in case of descending order time stamp serial, which usually happens as the results of the nest query,
    // all data needs to be check.
    if (pResInfo->numOfRes > 0) {
      TSKEY ts = *(TSKEY*)(buf + bytes);
      if (ts < endKey) {
        return TSDB_CODE_SUCCESS;
      }
H
Haojun Liao 已提交
1018 1019
    }

1020 1021 1022 1023 1024
    for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
        continue;
      }

1025 1026
      numOfElems++;

1027
      char* data = colDataGetData(pInputCol, i);
1028
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1029

1030
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
1031 1032 1033
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
//        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
1034
        pResInfo->numOfRes = 1;
1035
        break;
1036 1037
      }
    }
H
Haojun Liao 已提交
1038 1039 1040
  }

  SET_VAL(pResInfo, numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
1041
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1042 1043
}

H
Haojun Liao 已提交
1044
int32_t lastFunction(SqlFunctionCtx *pCtx) {
H
Haojun Liao 已提交
1045 1046
  int32_t numOfElems = 0;

H
Haojun Liao 已提交
1047
  SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
H
Haojun Liao 已提交
1048 1049 1050 1051 1052
  char* buf = GET_ROWCELL_INTERBUF(pResInfo);

  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnInfoData* pInputCol = pInput->pData[0];

1053 1054
  int32_t bytes = pInputCol->info.bytes;

H
Haojun Liao 已提交
1055
  // All null data column, return directly.
1056
  if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
H
Haojun Liao 已提交
1057
    ASSERT(pInputCol->hasNull == true);
H
Haojun Liao 已提交
1058
    return 0;
H
Haojun Liao 已提交
1059 1060
  }

1061 1062 1063 1064 1065 1066 1067 1068
  SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet)? pInput->pColumnDataAgg[0]:NULL;

  TSKEY startKey = getRowPTs(pInput->pPTS, 0);
  TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);

  int32_t blockDataOrder = (startKey <= endKey)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;

  if (blockDataOrder == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
1069
    for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
1070
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
H
Haojun Liao 已提交
1071 1072 1073 1074
        continue;
      }

      numOfElems++;
1075 1076 1077

      char* data = colDataGetData(pInputCol, i);
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1078
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
1079 1080 1081 1082 1083
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
        pResInfo->numOfRes = 1;
      }
H
Haojun Liao 已提交
1084 1085
      break;
    }
1086
  } else {  // descending order
H
Haojun Liao 已提交
1087
    for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
1088
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
H
Haojun Liao 已提交
1089 1090 1091
        continue;
      }

1092
      numOfElems++;
H
Haojun Liao 已提交
1093

1094 1095
      char* data = colDataGetData(pInputCol, i);
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1096
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
1097 1098 1099
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
        pResInfo->numOfRes = 1;
H
Haojun Liao 已提交
1100 1101 1102 1103 1104 1105 1106
//        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
      }
      break;
    }
  }

  SET_VAL(pResInfo, numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
1107
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1108
}
H
Haojun Liao 已提交
1109

H
Haojun Liao 已提交
1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SDiffInfo);
  return true;
}

bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) {
  if (!functionSetup(pCtx, pResInfo)) {
    return false;
  }

  SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1121 1122
  pDiffInfo->hasPrev  = false;
  pDiffInfo->prev.i64 = 0;
H
Haojun Liao 已提交
1123
  pDiffInfo->ignoreNegative = false; // TODO set correct param
H
Haojun Liao 已提交
1124 1125
  pDiffInfo->includeNull = false;
  pDiffInfo->firstOutput = false;
H
Haojun Liao 已提交
1126 1127 1128 1129
  return true;
}

int32_t diffFunction(SqlFunctionCtx *pCtx) {
H
Haojun Liao 已提交
1130
  SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
H
Haojun Liao 已提交
1131 1132 1133 1134 1135
  SDiffInfo *pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);

  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnInfoData* pInputCol = pInput->pData[0];

H
Haojun Liao 已提交
1136
  bool  isFirstBlock = (pDiffInfo->hasPrev == false);
H
Haojun Liao 已提交
1137 1138 1139 1140 1141
  int32_t numOfElems = 0;

  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
//  int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;

H
Haojun Liao 已提交
1142
  SColumnInfoData* pTsOutput = pCtx->pTsOutput;
H
Haojun Liao 已提交
1143
  TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
H
Haojun Liao 已提交
1144

H
Haojun Liao 已提交
1145
  int32_t startOffset = pCtx->offset;
H
Haojun Liao 已提交
1146 1147
  switch (pInputCol->info.type) {
    case TSDB_DATA_TYPE_INT: {
H
Haojun Liao 已提交
1148
      SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
H
Haojun Liao 已提交
1149
      for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
H
Haojun Liao 已提交
1150 1151

        int32_t pos = startOffset + (isFirstBlock? (numOfElems-1):numOfElems);
H
Haojun Liao 已提交
1152
        if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
H
Haojun Liao 已提交
1153 1154 1155 1156 1157 1158 1159 1160
          if (pDiffInfo->includeNull) {
            colDataSetNull_f(pOutput->nullbitmap, pos);
            if (tsList != NULL) {
              colDataAppendInt64(pTsOutput, pos, &tsList[i]);
            }

            numOfElems += 1;
          }
H
Haojun Liao 已提交
1161 1162 1163 1164
          continue;
        }

        int32_t v = *(int32_t*) colDataGetData(pInputCol, i);
H
Haojun Liao 已提交
1165 1166 1167 1168 1169 1170
        if (pDiffInfo->hasPrev) {
          int32_t delta = (int32_t)(v - pDiffInfo->prev.i64);  // direct previous may be null
          if (delta < 0 && pDiffInfo->ignoreNegative) {
            colDataSetNull_f(pOutput->nullbitmap, pos);
          } else {
            colDataAppendInt32(pOutput, pos, &delta);
H
Haojun Liao 已提交
1171 1172
          }

1173
          if (pTsOutput != NULL) {
H
Haojun Liao 已提交
1174 1175
            colDataAppendInt64(pTsOutput, pos, &tsList[i]);
          }
H
Haojun Liao 已提交
1176 1177 1178
        }

        pDiffInfo->prev.i64 = v;
H
Haojun Liao 已提交
1179
        pDiffInfo->hasPrev  = true;
H
Haojun Liao 已提交
1180 1181 1182 1183
        numOfElems++;
      }
      break;
    }
H
Haojun Liao 已提交
1184

H
Haojun Liao 已提交
1185
    case TSDB_DATA_TYPE_BIGINT: {
H
Haojun Liao 已提交
1186
      SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
H
Haojun Liao 已提交
1187 1188 1189 1190 1191 1192
      for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
        if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
          continue;
        }

        int32_t v = 0;
H
Haojun Liao 已提交
1193
        if (pDiffInfo->hasPrev) {
H
Haojun Liao 已提交
1194 1195 1196 1197 1198 1199
          v = *(int64_t*) colDataGetData(pInputCol, i);
          int64_t delta = (int64_t)(v - pDiffInfo->prev.i64);  // direct previous may be null
          if (pDiffInfo->ignoreNegative) {
            continue;
          }

H
Haojun Liao 已提交
1200 1201 1202 1203 1204
//          *(pOutput++) = delta;
//          *pTimestamp  = (tsList != NULL)? tsList[i]:0;
//
//          pOutput    += 1;
//          pTimestamp += 1;
H
Haojun Liao 已提交
1205 1206 1207
        }

        pDiffInfo->prev.i64 = v;
H
Haojun Liao 已提交
1208
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224
        numOfElems++;
      }
      break;
    }
#if 0
    case TSDB_DATA_TYPE_DOUBLE: {
      double *pData = (double *)data;
      double *pOutput = (double *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }
H
Haojun Liao 已提交
1225

H
Haojun Liao 已提交
1226
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1227 1228 1229 1230 1231 1232 1233
          SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->d64Prev = pData[i];
H
Haojun Liao 已提交
1234
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250
        numOfElems++;
      }
      break;
    }
    case TSDB_DATA_TYPE_FLOAT: {
      float *pData = (float *)data;
      float *pOutput = (float *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1251
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1252 1253 1254 1255 1256 1257 1258
          *pOutput = (float)(pData[i] - pDiffInfo->d64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->d64Prev = pData[i];
H
Haojun Liao 已提交
1259
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275
        numOfElems++;
      }
      break;
    }
    case TSDB_DATA_TYPE_SMALLINT: {
      int16_t *pData = (int16_t *)data;
      int16_t *pOutput = (int16_t *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1276
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1277 1278 1279 1280 1281 1282 1283
          *pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->i64Prev = pData[i];
H
Haojun Liao 已提交
1284
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301
        numOfElems++;
      }
      break;
    }

    case TSDB_DATA_TYPE_TINYINT: {
      int8_t *pData = (int8_t *)data;
      int8_t *pOutput = (int8_t *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1302
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1303 1304 1305 1306 1307 1308 1309
          *pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->i64Prev = pData[i];
H
Haojun Liao 已提交
1310
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321
        numOfElems++;
      }
      break;
    }
#endif
    default:
      break;
//      qError("error input type");
  }

  // initial value is not set yet
H
Haojun Liao 已提交
1322
  if (!pDiffInfo->hasPrev || numOfElems <= 0) {
H
Haojun Liao 已提交
1323 1324 1325 1326 1327
    /*
     * 1. current block and blocks before are full of null
     * 2. current block may be null value
     */
    assert(pCtx->hasNull);
wmmhello's avatar
wmmhello 已提交
1328
    return 0;
H
Haojun Liao 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339
  } else {
//    for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
//      SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
//      if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
//        aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
//      }
//    }

    int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
    return forwardStep;
  }
H
Haojun Liao 已提交
1340
}
H
Haojun Liao 已提交
1341

1342 1343
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
1344
  pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
1345 1346 1347 1348 1349
  return true;
}

static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
  SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
1350 1351 1352 1353
  STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
  pRes->pItems = (STopBotResItem*)((char*) pRes + sizeof(STopBotRes));

  return pRes;
1354 1355
}

1356 1357
static void doAddIntoResult(STopBotRes* pRes, int32_t maxSize, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
                            uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo);
1358

1359
int32_t topFunction(SqlFunctionCtx *pCtx) {
1360
  int32_t numOfElems = 0;
1361
  SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383

  STopBotRes *pRes = getTopBotOutputInfo(pCtx);

//  if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
//    buildTopBotStruct(pRes, pCtx);
//  }

  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnInfoData* pCol = pInput->pData[0];

  int32_t type = pInput->pData[0]->info.type;

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

  for (int32_t i = start; i < numOfRows + start; ++i) {
    if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
      continue;
    }
    numOfElems++;

    char* data = colDataGetData(pCol, i);
1384
    doAddIntoResult(pRes, pCtx->param[1].param.i, data, i, NULL, type, pInput->uid, pResInfo);
1385 1386
  }

1387
  return TSDB_CODE_SUCCESS;
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416
}

static int32_t topBotResComparFn(const void *p1, const void *p2, const void *param) {
  uint16_t type = *(uint16_t *) param;

  STopBotResItem  *val1 = (STopBotResItem *) p1;
  STopBotResItem  *val2 = (STopBotResItem *) p2;

  if (IS_SIGNED_NUMERIC_TYPE(type)) {
    if (val1->v.i == val2->v.i) {
      return 0;
    }

    return (val1->v.i > val2->v.i) ? 1 : -1;
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
    if (val1->v.u == val2->v.u) {
      return 0;
    }

    return (val1->v.u > val2->v.u) ? 1 : -1;
  }

  if (val1->v.d == val2->v.d) {
    return 0;
  }

  return (val1->v.d > val2->v.d) ? 1 : -1;
}

1417 1418 1419

void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
    uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
1420 1421 1422 1423 1424 1425 1426
  SVariant val = {0};
  taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);

  STopBotResItem *pItems = pRes->pItems;
  assert(pItems != NULL);

  // not full yet
1427 1428
  if (pEntryInfo->numOfRes < maxSize) {
    STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
1429 1430 1431 1432
    pItem->v   = val;
    pItem->uid = uid;
    pItem->tuplePos.pageId = -1;  // todo set the corresponding tuple data in the disk-based buffer

1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450
    if (pRes->pageId == -1) {
      SFilePage* pPage = getNewBufPage(NULL, 0, &pRes->pageId);
      pPage->num = sizeof(SFilePage);

      // keep the current row data
      for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
        SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
        bool isNull = colDataIsNull_s(pCol, rowIndex);


        colDataGetData(pCol, rowIndex);
      }

    }

    // allocate the buffer and keep the data of this row into the new allocated buffer
    pEntryInfo->numOfRes++;
    taosheapsort((void *) pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void *) &type, topBotResComparFn, false);
1451
  } else { // replace the minimum value in the result
1452 1453 1454
    if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
        (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
        (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
1455
      STopBotResItem* pItem = &pItems[0];
1456 1457 1458 1459
      pItem->v   = val;
      pItem->uid = uid;
      pItem->tuplePos.pageId = -1;  // todo set the corresponding tuple data in the disk-based buffer

1460
      taosheapadjust((void *) pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void *) &type, topBotResComparFn, NULL, false);
1461
    }
1462 1463
  }
}
1464

1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
  SResultRowEntryInfo *pEntryInfo = GET_RES_INFO(pCtx);
  STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
  pEntryInfo->complete = true;

  int32_t type = pCtx->input.pData[0]->info.type;
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

  // todo assign the tag value and the corresponding row data
  int32_t currentRow = pBlock->info.rows;
  switch(type) {
    case TSDB_DATA_TYPE_INT: {
      for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
        STopBotResItem* pItem = &pRes->pItems[i];
        colDataAppendInt32(pCol, currentRow++, (int32_t*)&pItem->v.i);

        int32_t pageId = pItem->tuplePos.pageId;
        int32_t offset = pItem->tuplePos.offset;
        if (pageId != -1) {
          // todo
        }
      }
      break;
    }
  }

  return pEntryInfo->numOfRes;
1492

1493
//  return functionFinalize(pCtx, pBlock, slotId);
1494
}