builtinsimpl.c 56.6 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "builtinsimpl.h"
17
#include "function.h"
18
#include "querynodes.h"
H
Haojun Liao 已提交
19 20
#include "taggfunction.h"
#include "tdatablock.h"
21
#include "tpercentile.h"
H
Haojun Liao 已提交
22

G
Ganlin Zhao 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36
typedef struct SSumRes {
  union {
    int64_t  isum;
    uint64_t usum;
    double   dsum;
  };
} SSumRes;

typedef struct SAvgRes {
  double  result;
  SSumRes sum;
  int64_t count;
} SAvgRes;

37 38
typedef struct STopBotResItem {
  SVariant v;
39
  uint64_t uid;  // it is a table uid, used to extract tag data during building of the final result for the tag data
40
  struct {
41 42 43
    int32_t pageId;
    int32_t offset;
  } tuplePos;  // tuple data of this chosen row
44 45
} STopBotResItem;

G
Ganlin Zhao 已提交
46
typedef struct STopBotRes {
47
  STopBotResItem* pItems;
G
Ganlin Zhao 已提交
48 49 50 51 52
} STopBotRes;

typedef struct SStddevRes {
  double  result;
  int64_t count;
53 54 55 56 57 58 59 60
  union {
    double  quadraticDSum;
    int64_t quadraticISum;
  };
  union {
    double  dsum;
    int64_t isum;
  };
G
Ganlin Zhao 已提交
61 62 63 64
} SStddevRes;

typedef struct SPercentileInfo {
  double      result;
65
  tMemBucket* pMemBucket;
G
Ganlin Zhao 已提交
66 67 68 69 70 71 72
  int32_t     stage;
  double      minval;
  double      maxval;
  int64_t     numOfElems;
} SPercentileInfo;

typedef struct SDiffInfo {
73 74 75 76 77 78 79 80
  bool hasPrev;
  bool includeNull;
  bool ignoreNegative;
  bool firstOutput;
  union {
    int64_t i64;
    double  d64;
  } prev;
81 82

  int64_t prevTs;
G
Ganlin Zhao 已提交
83 84
} SDiffInfo;

G
Ganlin Zhao 已提交
85 86 87 88 89 90 91
typedef struct SSpreadInfo {
  double result;
  bool   hasResult;
  double min;
  double max;
} SSpreadInfo;

92 93 94 95 96 97
#define SET_VAL(_info, numOfElem, res) \
  do {                                 \
    if ((numOfElem) <= 0) {            \
      break;                           \
    }                                  \
    (_info)->numOfRes = (res);         \
H
Haojun Liao 已提交
98 99
  } while (0)

G
Ganlin Zhao 已提交
100 101 102 103 104 105
#define GET_TS_LIST(x)    ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])

#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx)                      \
  do {                                                             \
    for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
106
      SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i];      \
G
Ganlin Zhao 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
      __ctx->fpSet.process(__ctx);                                 \
    }                                                              \
  } while (0);

#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
  do {                                                \
    if (((left) < (right)) ^ (sign)) {                \
      (left) = (right);                               \
      DO_UPDATE_SUBSID_RES(ctx, _ts);                 \
      (num) += 1;                                     \
    }                                                 \
  } while (0)

#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num)        \
  do {                                                                   \
122
    _t* d = (_t*)((_col)->pData);                                        \
G
Ganlin Zhao 已提交
123 124 125 126 127 128 129 130 131
    for (int32_t i = (_start); i < (_nrow) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      }                                                                  \
      TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0;       \
      UPDATE_DATA(ctx, val, d[i], num, sign, ts);                        \
    }                                                                    \
  } while (0)

132
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
H
Haojun Liao 已提交
133 134 135 136 137 138 139 140 141 142 143 144
  if (pResultInfo->initialized) {
    return false;
  }

  if (pCtx->pOutput != NULL) {
    memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes);
  }

  initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize);
  return true;
}

145
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
146
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
147
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
148

149
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
150 151
  pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
  /*cleanupResultRowEntry(pResInfo);*/
152 153 154 155 156

  char* in = GET_ROWCELL_INTERBUF(pResInfo);
  colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);

  return pResInfo->numOfRes;
H
Haojun Liao 已提交
157 158
}

159
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
160
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
161 162 163
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
164
  pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
165 166 167 168 169 170 171 172
  cleanupResultRowEntry(pResInfo);

  char* in = finalResult;
  colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);

  return pResInfo->numOfRes;
}

173 174 175
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
  SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
  if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
176
    return FUNC_DATA_REQUIRED_NOT_LOAD;
177
  }
178
  return FUNC_DATA_REQUIRED_STATIS_LOAD;
179
}
H
Haojun Liao 已提交
180 181 182 183 184 185 186 187 188 189

bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

/*
 * count function does need the finalize, if data is missing, the default value, which is 0, is used
 * count function does not use the pCtx->interResBuf to keep the intermediate buffer
 */
190
int32_t countFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
191 192 193
  int32_t numOfElem = 0;

  /*
H
Haojun Liao 已提交
194 195 196
   * 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true;
   * 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true;
   * 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false;
H
Haojun Liao 已提交
197 198
   */
  SInputColumnInfoData* pInput = &pCtx->input;
199
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
200 201 202 203 204 205 206 207 208 209 210 211
  if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) {
    numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
    ASSERT(numOfElem >= 0);
  } else {
    if (pInputCol->hasNull) {
      for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
        if (colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
          continue;
        }
        numOfElem += 1;
      }
    } else {
212 213
      // when counting on the primary time stamp column and no statistics data is presented, use the size value
      // directly.
H
Haojun Liao 已提交
214 215 216 217 218
      numOfElem = pInput->numOfRows;
    }
  }

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
219 220
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
  *((int64_t*)buf) += numOfElem;
H
Haojun Liao 已提交
221 222

  SET_VAL(pResInfo, numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
223
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
224 225 226 227
}

#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem)             \
  do {                                                                   \
228
    _t* d = (_t*)(_col->pData);                                          \
H
Haojun Liao 已提交
229 230 231 232 233 234 235 236 237
    for (int32_t i = (_start); i < (_rows) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      };                                                                 \
      (_res) += (d)[i];                                                  \
      (numOfElem)++;                                                     \
    }                                                                    \
  } while (0)

238
int32_t sumFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
239 240 241 242
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
243 244
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
  int32_t               type = pInput->pData[0]->info.type;
H
Haojun Liao 已提交
245

246
  SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
G
Ganlin Zhao 已提交
247

H
Haojun Liao 已提交
248 249 250 251 252
  if (pInput->colDataAggIsSet) {
    numOfElem = pInput->numOfRows - pAgg->numOfNull;
    ASSERT(numOfElem >= 0);

    if (IS_SIGNED_NUMERIC_TYPE(type)) {
253
      pSumRes->isum += pAgg->sum;
H
Haojun Liao 已提交
254
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
255
      pSumRes->usum += pAgg->sum;
H
Haojun Liao 已提交
256
    } else if (IS_FLOAT_TYPE(type)) {
257
      pSumRes->dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
H
Haojun Liao 已提交
258 259 260 261
    }
  } else {  // computing based on the true data block
    SColumnInfoData* pCol = pInput->pData[0];

262
    int32_t start = pInput->startRowIndex;
H
Haojun Liao 已提交
263 264
    int32_t numOfRows = pInput->numOfRows;

265 266
    if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
      if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
267 268 269 270 271 272 273
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int8_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_SMALLINT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int16_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_INT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int32_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_BIGINT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int64_t, numOfElem);
H
Haojun Liao 已提交
274
      }
275 276 277 278 279 280 281 282 283
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
      if (type == TSDB_DATA_TYPE_UTINYINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint8_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_USMALLINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint16_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_UINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint32_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_UBIGINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint64_t, numOfElem);
H
Haojun Liao 已提交
284
      }
285 286 287 288
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
      LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, double, numOfElem);
    } else if (type == TSDB_DATA_TYPE_FLOAT) {
      LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, float, numOfElem);
H
Haojun Liao 已提交
289 290 291 292 293
    }
  }

  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
294
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
295 296
}

H
Haojun Liao 已提交
297
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
H
Haojun Liao 已提交
298 299 300 301
  pEnv->calcMemSize = sizeof(SSumRes);
  return true;
}

G
Ganlin Zhao 已提交
302 303 304 305 306
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(double);
  return true;
}

307
bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
G
Ganlin Zhao 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
  memset(pRes, 0, sizeof(SAvgRes));
  return true;
}

int32_t avgFunction(SqlFunctionCtx* pCtx) {
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t               type = pInput->pData[0]->info.type;

  SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

  // computing based on the true data block
  SColumnInfoData* pCol = pInput->pData[0];

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

  switch (type) {
    case TSDB_DATA_TYPE_TINYINT: {
334 335 336 337
      int8_t* plist = (int8_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
G
Ganlin Zhao 已提交
338 339
        }

340 341 342
        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
G
Ganlin Zhao 已提交
343 344
      }

345 346 347 348
      break;
    }

    case TSDB_DATA_TYPE_SMALLINT: {
G
Ganlin Zhao 已提交
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
      int16_t* plist = (int16_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      int32_t* plist = (int32_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }

      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      int64_t* plist = (int64_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      float* plist = (float*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.dsum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double* plist = (double*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.dsum += plist[i];
      }
      break;
    }

    default:
      break;
  }

  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
428
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
G
Ganlin Zhao 已提交
429
  SInputColumnInfoData* pInput = &pCtx->input;
430 431
  int32_t               type = pInput->pData[0]->info.type;
  SAvgRes*              pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
G
Ganlin Zhao 已提交
432
  if (IS_INTEGER_TYPE(type)) {
433
    pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
G
Ganlin Zhao 已提交
434
  } else {
435
    pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
G
Ganlin Zhao 已提交
436
  }
437

H
Haojun Liao 已提交
438
  return functionFinalize(pCtx, pBlock);
G
Ganlin Zhao 已提交
439 440
}

441
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
442 443 444
  return FUNC_DATA_REQUIRED_STATIS_LOAD;
}

445
bool maxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
446 447 448 449 450
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
451
  switch (pCtx->resDataInfo.type) {
452
    case TSDB_DATA_TYPE_INT:
453
      *((int32_t*)buf) = INT32_MIN;
454 455
      break;
    case TSDB_DATA_TYPE_UINT:
456
      *((uint32_t*)buf) = 0;
457 458
      break;
    case TSDB_DATA_TYPE_FLOAT:
459
      *((float*)buf) = -FLT_MAX;
460 461
      break;
    case TSDB_DATA_TYPE_DOUBLE:
462
      SET_DOUBLE_VAL(((double*)buf), -DBL_MAX);
463 464
      break;
    case TSDB_DATA_TYPE_BIGINT:
465
      *((int64_t*)buf) = INT64_MIN;
466 467
      break;
    case TSDB_DATA_TYPE_UBIGINT:
468
      *((uint64_t*)buf) = 0;
469 470
      break;
    case TSDB_DATA_TYPE_SMALLINT:
471
      *((int16_t*)buf) = INT16_MIN;
472 473
      break;
    case TSDB_DATA_TYPE_USMALLINT:
474
      *((uint16_t*)buf) = 0;
475 476
      break;
    case TSDB_DATA_TYPE_TINYINT:
477
      *((int8_t*)buf) = INT8_MIN;
478 479
      break;
    case TSDB_DATA_TYPE_UTINYINT:
480
      *((uint8_t*)buf) = 0;
481
      break;
482 483 484
    case TSDB_DATA_TYPE_BOOL:
      *((int8_t*)buf) = 0;
      break;
485 486 487 488 489 490
    default:
      assert(0);
  }
  return true;
}

491
bool minFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
492 493 494 495 496
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;  // not initialized since it has been initialized
  }

  char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
497
  switch (pCtx->resDataInfo.type) {
498
    case TSDB_DATA_TYPE_TINYINT:
499
      *((int8_t*)buf) = INT8_MAX;
500 501
      break;
    case TSDB_DATA_TYPE_UTINYINT:
502
      *(uint8_t*)buf = UINT8_MAX;
503 504
      break;
    case TSDB_DATA_TYPE_SMALLINT:
505
      *((int16_t*)buf) = INT16_MAX;
506 507
      break;
    case TSDB_DATA_TYPE_USMALLINT:
508
      *((uint16_t*)buf) = UINT16_MAX;
509 510
      break;
    case TSDB_DATA_TYPE_INT:
511
      *((int32_t*)buf) = INT32_MAX;
512 513
      break;
    case TSDB_DATA_TYPE_UINT:
514
      *((uint32_t*)buf) = UINT32_MAX;
515 516
      break;
    case TSDB_DATA_TYPE_BIGINT:
517
      *((int64_t*)buf) = INT64_MAX;
518 519
      break;
    case TSDB_DATA_TYPE_UBIGINT:
520
      *((uint64_t*)buf) = UINT64_MAX;
521 522
      break;
    case TSDB_DATA_TYPE_FLOAT:
523
      *((float*)buf) = FLT_MAX;
524 525
      break;
    case TSDB_DATA_TYPE_DOUBLE:
526
      SET_DOUBLE_VAL(((double*)buf), DBL_MAX);
527
      break;
528 529 530
    case TSDB_DATA_TYPE_BOOL:
      *((int8_t*)buf) = 1;
      break;
531 532 533 534 535 536 537
    default:
      assert(0);
  }

  return true;
}

H
Haojun Liao 已提交
538
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
539 540 541 542 543 544 545 546 547 548
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

#define GET_TS_LIST(x)    ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])

#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx)                      \
  do {                                                             \
    for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
549
      SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i];      \
H
Haojun Liao 已提交
550
      __ctx->fpSet.process(__ctx);                                 \
551 552 553
    }                                                              \
  } while (0);

554 555
#define DO_UPDATE_SUBSID_RES(ctx, ts)                          \
  do {                                                         \
556
    for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
557 558 559 560 561 562 563
      SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i];    \
      if (__ctx->functionId == FUNCTION_TS_DUMMY) {            \
        __ctx->tag.i = (ts);                                   \
        __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;              \
      }                                                        \
      __ctx->fpSet.process(__ctx);                             \
    }                                                          \
564 565 566
  } while (0)

#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
H
Haojun Liao 已提交
567 568 569 570 571 572
  do {                                                \
    if (((left) < (right)) ^ (sign)) {                \
      (left) = (right);                               \
      DO_UPDATE_SUBSID_RES(ctx, _ts);                 \
      (num) += 1;                                     \
    }                                                 \
573 574
  } while (0)

H
Haojun Liao 已提交
575
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num)        \
576
  do {                                                                   \
577
    _t* d = (_t*)((_col)->pData);                                        \
578 579 580 581 582 583 584 585 586
    for (int32_t i = (_start); i < (_nrow) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      }                                                                  \
      TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0;       \
      UPDATE_DATA(ctx, val, d[i], num, sign, ts);                        \
    }                                                                    \
  } while (0)

587
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
588 589 590
  int32_t numOfElems = 0;

  SInputColumnInfoData* pInput = &pCtx->input;
591
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
592 593

  SColumnInfoData* pCol = pInput->pData[0];
594
  int32_t          type = pCol->info.type;
595 596

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
597
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
598 599 600 601 602 603 604 605 606 607 608 609 610 611

  // data in current data block are qualified to the query
  if (pInput->colDataAggIsSet) {
    numOfElems = pInput->numOfRows - pAgg->numOfNull;
    ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0);

    if (numOfElems == 0) {
      return numOfElems;
    }

    void*   tval = NULL;
    int16_t index = 0;

    if (isMinFunc) {
612
      tval = &pInput->pColumnDataAgg[0]->min;
613 614
      index = pInput->pColumnDataAgg[0]->minIndex;
    } else {
615
      tval = &pInput->pColumnDataAgg[0]->max;
616 617 618
      index = pInput->pColumnDataAgg[0]->maxIndex;
    }

619
    // the index is the original position, not the relative position
620
    TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL;
621 622

    if (IS_SIGNED_NUMERIC_TYPE(type)) {
623 624
      int64_t prev = 0;
      GET_TYPED_DATA(prev, int64_t, type, buf);
625

626 627
      int64_t val = GET_INT64_VAL(tval);
      if ((prev < val) ^ isMinFunc) {
628
        *(int64_t*)buf = val;
629 630
        for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
          SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
631 632 633 634 635 636 637 638
          if (__ctx->functionId == FUNCTION_TS_DUMMY) {  // TODO refactor
            __ctx->tag.i = key;
            __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
          }

          __ctx->fpSet.process(__ctx);
        }
      }
639
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
640 641 642
      uint64_t prev = 0;
      GET_TYPED_DATA(prev, uint64_t, type, buf);

643
      uint64_t val = GET_UINT64_VAL(tval);
H
Haojun Liao 已提交
644
      if ((prev < val) ^ isMinFunc) {
645
        *(uint64_t*)buf = val;
646 647
        for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
          SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
H
Haojun Liao 已提交
648 649 650 651
          if (__ctx->functionId == FUNCTION_TS_DUMMY) {  // TODO refactor
            __ctx->tag.i = key;
            __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
          }
652

H
Haojun Liao 已提交
653 654 655 656
          __ctx->fpSet.process(__ctx);
        }
      }
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
657 658
      double val = GET_DOUBLE_VAL(tval);
      UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key);
659
    } else if (type == TSDB_DATA_TYPE_FLOAT) {
660
      double val = GET_DOUBLE_VAL(tval);
661
      UPDATE_DATA(pCtx, *(float*)buf, val, numOfElems, isMinFunc, key);
662 663 664 665 666 667 668 669
    }

    return numOfElems;
  }

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

670 671
  if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
    if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
672 673
      LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
    } else if (type == TSDB_DATA_TYPE_SMALLINT) {
674
      LOOPCHECK_N(*(int16_t*)buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
675
    } else if (type == TSDB_DATA_TYPE_INT) {
676 677
      int32_t* pData = (int32_t*)pCol->pData;
      int32_t* val = (int32_t*)buf;
678

H
Haojun Liao 已提交
679
      for (int32_t i = start; i < start + numOfRows; ++i) {
680 681 682 683 684 685
        if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        if ((*val < pData[i]) ^ isMinFunc) {
          *val = pData[i];
686
          TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0;
687 688 689 690 691 692 693 694 695
          DO_UPDATE_SUBSID_RES(pCtx, ts);
        }

        numOfElems += 1;
      }

#if defined(_DEBUG_VIEW)
      qDebug("max value updated:%d", *retVal);
#endif
696
    } else if (type == TSDB_DATA_TYPE_BIGINT) {
697
      LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
698
    }
699 700
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
    if (type == TSDB_DATA_TYPE_UTINYINT) {
701
      LOOPCHECK_N(*(uint8_t*)buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
702
    } else if (type == TSDB_DATA_TYPE_USMALLINT) {
703
      LOOPCHECK_N(*(uint16_t*)buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
704
    } else if (type == TSDB_DATA_TYPE_UINT) {
705
      LOOPCHECK_N(*(uint32_t*)buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
706
    } else if (type == TSDB_DATA_TYPE_UBIGINT) {
707
      LOOPCHECK_N(*(uint64_t*)buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
708
    }
709
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
710
    LOOPCHECK_N(*(double*)buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
711
  } else if (type == TSDB_DATA_TYPE_FLOAT) {
712
    LOOPCHECK_N(*(float*)buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
713 714 715
  }

  return numOfElems;
H
Haojun Liao 已提交
716
}
717

718
int32_t minFunction(SqlFunctionCtx* pCtx) {
719 720
  int32_t numOfElems = doMinMaxHelper(pCtx, 1);
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
721
  return TSDB_CODE_SUCCESS;
722 723
}

724
int32_t maxFunction(SqlFunctionCtx* pCtx) {
725 726
  int32_t numOfElems = doMinMaxHelper(pCtx, 0);
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
727
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
728 729 730 731 732 733 734
}

bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SStddevRes);
  return true;
}

735
bool stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
H
Haojun Liao 已提交
736 737 738 739 740 741 742 743 744
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SStddevRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
  memset(pRes, 0, sizeof(SStddevRes));
  return true;
}

H
Haojun Liao 已提交
745
int32_t stddevFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
746 747 748 749
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
H
Haojun Liao 已提交
750
  int32_t               type = pInput->pData[0]->info.type;
H
Haojun Liao 已提交
751 752 753

  SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

H
Haojun Liao 已提交
754 755
  // computing based on the true data block
  SColumnInfoData* pCol = pInput->pData[0];
H
Haojun Liao 已提交
756

H
Haojun Liao 已提交
757 758
  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;
H
Haojun Liao 已提交
759

H
Haojun Liao 已提交
760 761
  switch (type) {
    case TSDB_DATA_TYPE_TINYINT: {
762 763 764 765
      int8_t* plist = (int8_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + start; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
H
Haojun Liao 已提交
766
        }
H
Haojun Liao 已提交
767

768 769 770 771
        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
H
Haojun Liao 已提交
772 773
      }

774 775 776 777
      break;
    }

    case TSDB_DATA_TYPE_SMALLINT: {
H
Haojun Liao 已提交
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
      int16_t* plist = (int16_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      int32_t* plist = (int32_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }

      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      int64_t* plist = (int64_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      float* plist = (float*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
G
Ganlin Zhao 已提交
832 833
        pStddevRes->dsum += plist[i];
        pStddevRes->quadraticDSum += plist[i] * plist[i];
H
Haojun Liao 已提交
834 835 836 837
      }
      break;
    }

H
Haojun Liao 已提交
838 839 840 841 842 843 844 845 846
    case TSDB_DATA_TYPE_DOUBLE: {
      double* plist = (double*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
G
Ganlin Zhao 已提交
847 848
        pStddevRes->dsum += plist[i];
        pStddevRes->quadraticDSum += plist[i] * plist[i];
H
Haojun Liao 已提交
849 850 851 852 853 854 855 856
      }
      break;
    }

    default:
      break;
  }

H
Haojun Liao 已提交
857 858
  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
859
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
860 861
}

H
Haojun Liao 已提交
862
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
G
Ganlin Zhao 已提交
863
  SInputColumnInfoData* pInput = &pCtx->input;
864 865 866
  int32_t               type = pInput->pData[0]->info.type;
  SStddevRes*           pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  double                avg;
G
Ganlin Zhao 已提交
867
  if (IS_INTEGER_TYPE(type)) {
868 869
    avg = pStddevRes->isum / ((double)pStddevRes->count);
    pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg);
G
Ganlin Zhao 已提交
870
  } else {
871 872
    avg = pStddevRes->dsum / ((double)pStddevRes->count);
    pStddevRes->result = sqrt(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg);
G
Ganlin Zhao 已提交
873
  }
874

875
  return functionFinalize(pCtx, pBlock);
H
Haojun Liao 已提交
876 877 878 879 880 881 882
}

bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SPercentileInfo);
  return true;
}

883
bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
H
Haojun Liao 已提交
884 885 886 887 888
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  // in the first round, get the min-max value of all involved data
889
  SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
H
Haojun Liao 已提交
890 891 892 893 894
  SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX);
  SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX);
  pInfo->numOfElems = 0;

  return true;
H
Haojun Liao 已提交
895 896
}

897 898 899
int32_t percentileFunction(SqlFunctionCtx* pCtx) {
  int32_t              notNullElems = 0;
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
900 901

  SInputColumnInfoData* pInput = &pCtx->input;
902
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
H
Haojun Liao 已提交
903

904 905
  SColumnInfoData* pCol = pInput->pData[0];
  int32_t          type = pCol->info.type;
906

907
  SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
908 909
  if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
    pInfo->stage += 1;
H
Haojun Liao 已提交
910

H
Haojun Liao 已提交
911 912 913
    // all data are null, set it completed
    if (pInfo->numOfElems == 0) {
      pResInfo->complete = true;
H
Haojun Liao 已提交
914
      return 0;
H
Haojun Liao 已提交
915 916 917 918 919 920 921
    } else {
      pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
    }
  }

  // the first stage, only acquire the min/max value
  if (pInfo->stage == 0) {
922
    if (pCtx->input.colDataAggIsSet) {
H
Haojun Liao 已提交
923
      double tmin = 0.0, tmax = 0.0;
924 925 926 927 928 929 930 931 932
      if (IS_SIGNED_NUMERIC_TYPE(type)) {
        tmin = (double)GET_INT64_VAL(&pAgg->min);
        tmax = (double)GET_INT64_VAL(&pAgg->max);
      } else if (IS_FLOAT_TYPE(type)) {
        tmin = GET_DOUBLE_VAL(&pAgg->min);
        tmax = GET_DOUBLE_VAL(&pAgg->max);
      } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
        tmin = (double)GET_UINT64_VAL(&pAgg->min);
        tmax = (double)GET_UINT64_VAL(&pAgg->max);
H
Haojun Liao 已提交
933 934 935 936 937 938 939 940 941 942
      }

      if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
        SET_DOUBLE_VAL(&pInfo->minval, tmin);
      }

      if (GET_DOUBLE_VAL(&pInfo->maxval) < tmax) {
        SET_DOUBLE_VAL(&pInfo->maxval, tmax);
      }

943
      pInfo->numOfElems += (pInput->numOfRows - pAgg->numOfNull);
H
Haojun Liao 已提交
944
    } else {
945 946 947 948
      // check the valid data one by one
      int32_t start = pInput->startRowIndex;
      for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
H
Haojun Liao 已提交
949 950 951
          continue;
        }

952
        char* data = colDataGetData(pCol, i);
953

H
Haojun Liao 已提交
954 955 956 957 958 959 960 961 962 963 964 965 966 967
        double v = 0;
        GET_TYPED_DATA(v, double, pCtx->inputType, data);
        if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
          SET_DOUBLE_VAL(&pInfo->minval, v);
        }

        if (v > GET_DOUBLE_VAL(&pInfo->maxval)) {
          SET_DOUBLE_VAL(&pInfo->maxval, v);
        }

        pInfo->numOfElems += 1;
      }
    }

H
Haojun Liao 已提交
968
    return 0;
H
Haojun Liao 已提交
969 970 971
  }

  // the second stage, calculate the true percentile value
972 973 974
  int32_t start = pInput->startRowIndex;
  for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
    if (colDataIsNull_f(pCol->nullbitmap, i)) {
H
Haojun Liao 已提交
975 976 977
      continue;
    }

978
    char* data = colDataGetData(pCol, i);
979

H
Haojun Liao 已提交
980 981 982 983
    notNullElems += 1;
    tMemBucketPut(pInfo->pMemBucket, data, 1);
  }

984
  SET_VAL(pResInfo, notNullElems, 1);
wmmhello's avatar
wmmhello 已提交
985
  return TSDB_CODE_SUCCESS;
986 987
}

988
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
989
  SVariant* pVal = &pCtx->param[1].param;
990
  double    v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
H
Haojun Liao 已提交
991

992 993
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  SPercentileInfo*     ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
994

995
  tMemBucket* pMemBucket = ppInfo->pMemBucket;
996 997 998 999 1000
  if (pMemBucket != NULL && pMemBucket->total > 0) {  // check for null
    SET_DOUBLE_VAL(&ppInfo->result, getPercentile(pMemBucket, v));
  }

  tMemBucketDestroy(pMemBucket);
1001
  return functionFinalize(pCtx, pBlock);
H
Haojun Liao 已提交
1002
}
H
Haojun Liao 已提交
1003

H
Haojun Liao 已提交
1004 1005
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
1006
  pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
H
Haojun Liao 已提交
1007 1008 1009
  return true;
}

1010 1011 1012 1013 1014
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
  if (pTsColInfo == NULL) {
    return 0;
  }

1015
  return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
1016 1017
}

1018 1019
// This ordinary first function does not care if current scan is ascending order or descending order scan
// the OPTIMIZED version of first function will only handle the ascending order scan
1020
int32_t firstFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
1021 1022
  int32_t numOfElems = 0;

1023 1024
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1025 1026

  SInputColumnInfoData* pInput = &pCtx->input;
1027
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
1028

1029 1030
  int32_t bytes = pInputCol->info.bytes;

H
Haojun Liao 已提交
1031
  // All null data column, return directly.
H
Haojun Liao 已提交
1032
  if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
H
Haojun Liao 已提交
1033
    ASSERT(pInputCol->hasNull == true);
H
Haojun Liao 已提交
1034
    return 0;
H
Haojun Liao 已提交
1035 1036
  }

1037
  SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
1038

1039 1040
  TSKEY startKey = getRowPTs(pInput->pPTS, 0);
  TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
1041

1042
  int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057

  if (blockDataOrder == TSDB_ORDER_ASC) {
    // filter according to current result firstly
    if (pResInfo->numOfRes > 0) {
      TSKEY ts = *(TSKEY*)(buf + bytes);
      if (ts < startKey) {
        return TSDB_CODE_SUCCESS;
      }
    }

    for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
        continue;
      }

1058 1059
      numOfElems++;

1060
      char* data = colDataGetData(pInputCol, i);
1061
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1062

1063
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
1064 1065
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
1066
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
1067 1068

        pResInfo->numOfRes = 1;
1069
        break;
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
      }
    }
  } else {
    // in case of descending order time stamp serial, which usually happens as the results of the nest query,
    // all data needs to be check.
    if (pResInfo->numOfRes > 0) {
      TSKEY ts = *(TSKEY*)(buf + bytes);
      if (ts < endKey) {
        return TSDB_CODE_SUCCESS;
      }
H
Haojun Liao 已提交
1080 1081
    }

1082 1083 1084 1085 1086
    for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
        continue;
      }

1087 1088
      numOfElems++;

1089
      char* data = colDataGetData(pInputCol, i);
1090
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1091

1092
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
1093 1094
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
1095
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
1096
        pResInfo->numOfRes = 1;
1097
        break;
1098 1099
      }
    }
H
Haojun Liao 已提交
1100 1101 1102
  }

  SET_VAL(pResInfo, numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
1103
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1104 1105
}

1106
int32_t lastFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
1107 1108
  int32_t numOfElems = 0;

1109 1110
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1111 1112

  SInputColumnInfoData* pInput = &pCtx->input;
1113
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
1114

1115 1116
  int32_t bytes = pInputCol->info.bytes;

H
Haojun Liao 已提交
1117
  // All null data column, return directly.
1118
  if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
H
Haojun Liao 已提交
1119
    ASSERT(pInputCol->hasNull == true);
H
Haojun Liao 已提交
1120
    return 0;
H
Haojun Liao 已提交
1121 1122
  }

1123
  SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
1124 1125 1126 1127

  TSKEY startKey = getRowPTs(pInput->pPTS, 0);
  TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);

1128
  int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
1129 1130

  if (blockDataOrder == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
1131
    for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
1132
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
H
Haojun Liao 已提交
1133 1134 1135 1136
        continue;
      }

      numOfElems++;
1137 1138 1139

      char* data = colDataGetData(pInputCol, i);
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1140
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
1141 1142 1143 1144 1145
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
        pResInfo->numOfRes = 1;
      }
H
Haojun Liao 已提交
1146 1147
      break;
    }
1148
  } else {  // descending order
H
Haojun Liao 已提交
1149
    for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
1150
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
H
Haojun Liao 已提交
1151 1152 1153
        continue;
      }

1154
      numOfElems++;
H
Haojun Liao 已提交
1155

1156 1157
      char* data = colDataGetData(pInputCol, i);
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1158
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
1159 1160 1161
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
        pResInfo->numOfRes = 1;
1162
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
H
Haojun Liao 已提交
1163 1164 1165 1166 1167 1168
      }
      break;
    }
  }

  SET_VAL(pResInfo, numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
1169
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1170
}
H
Haojun Liao 已提交
1171

H
Haojun Liao 已提交
1172 1173 1174 1175 1176
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SDiffInfo);
  return true;
}

1177
bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
H
Haojun Liao 已提交
1178 1179 1180 1181 1182
  if (!functionSetup(pCtx, pResInfo)) {
    return false;
  }

  SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
1183
  pDiffInfo->hasPrev = false;
H
Haojun Liao 已提交
1184
  pDiffInfo->prev.i64 = 0;
1185
  pDiffInfo->ignoreNegative = false;  // TODO set correct param
H
Haojun Liao 已提交
1186 1187
  pDiffInfo->includeNull = false;
  pDiffInfo->firstOutput = false;
H
Haojun Liao 已提交
1188 1189 1190
  return true;
}

1191 1192 1193
int32_t diffFunction(SqlFunctionCtx* pCtx) {
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  SDiffInfo*           pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1194 1195

  SInputColumnInfoData* pInput = &pCtx->input;
1196
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
1197

1198
  bool    isFirstBlock = (pDiffInfo->hasPrev == false);
H
Haojun Liao 已提交
1199 1200
  int32_t numOfElems = 0;

H
Haojun Liao 已提交
1201
  SColumnInfoData* pTsOutput = pCtx->pTsOutput;
1202
  TSKEY*           tsList = (int64_t*)pInput->pPTS->pData;
H
Haojun Liao 已提交
1203

H
Haojun Liao 已提交
1204
  int32_t startOffset = pCtx->offset;
H
Haojun Liao 已提交
1205 1206
  switch (pInputCol->info.type) {
    case TSDB_DATA_TYPE_INT: {
1207
      SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
      if (pCtx->order == TSDB_ORDER_ASC) {
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
          int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
          if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
            if (pDiffInfo->includeNull) {
              colDataSetNull_f(pOutput->nullbitmap, pos);
              if (tsList != NULL) {
                colDataAppendInt64(pTsOutput, pos, &tsList[i]);
              }

              numOfElems += 1;
            }
            continue;
          }

          int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
          if (pDiffInfo->hasPrev) {
            int32_t delta = (int32_t)(v - pDiffInfo->prev.i64);  // direct previous may be null
            if (delta < 0 && pDiffInfo->ignoreNegative) {
              colDataSetNull_f(pOutput->nullbitmap, pos);
            } else {
              colDataAppendInt32(pOutput, pos, &delta);
H
Haojun Liao 已提交
1230 1231
            }

1232 1233 1234
            if (pTsOutput != NULL) {
              colDataAppendInt64(pTsOutput, pos, &tsList[i]);
            }
H
Haojun Liao 已提交
1235
          }
1236 1237 1238 1239

          pDiffInfo->prev.i64 = v;
          pDiffInfo->hasPrev = true;
          numOfElems++;
H
Haojun Liao 已提交
1240
        }
1241 1242 1243 1244 1245
      } else {
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
          int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
          int32_t pos = startOffset + numOfElems;

1246
          // there is a row of previous data block to be handled in the first place.
1247
          if (pDiffInfo->hasPrev) {
1248
            int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v);  // direct previous may be null
1249 1250 1251 1252 1253
            if (delta < 0 && pDiffInfo->ignoreNegative) {
              colDataSetNull_f(pOutput->nullbitmap, pos);
            } else {
              colDataAppendInt32(pOutput, pos, &delta);
            }
H
Haojun Liao 已提交
1254

1255
            if (pTsOutput != NULL) {
1256
              colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs);
1257 1258
            }
            pDiffInfo->hasPrev = false;
H
Haojun Liao 已提交
1259 1260
          }

1261
          // it is not the last row of current block
1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
          if (i < pInput->numOfRows + pInput->startRowIndex - 1) {
            int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1);

            int32_t delta = v - next;  // direct previous may be null
            colDataAppendInt32(pOutput, pos, &delta);

            if (pTsOutput != NULL) {
              colDataAppendInt64(pTsOutput, pos, &tsList[i]);
            }
          } else {
            pDiffInfo->prev.i64 = v;
            if (pTsOutput != NULL) {
              pDiffInfo->prevTs = tsList[i];
            }
            pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1277
          }
1278
          numOfElems++;
H
Haojun Liao 已提交
1279 1280 1281 1282 1283
        }

      }
      break;
    }
H
Haojun Liao 已提交
1284

H
Haojun Liao 已提交
1285
    case TSDB_DATA_TYPE_BIGINT: {
1286
      SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
1287
      for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
H
Haojun Liao 已提交
1288 1289 1290 1291 1292
        if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
          continue;
        }

        int32_t v = 0;
H
Haojun Liao 已提交
1293
        if (pDiffInfo->hasPrev) {
1294
          v = *(int64_t*)colDataGetData(pInputCol, i);
H
Haojun Liao 已提交
1295 1296 1297 1298 1299
          int64_t delta = (int64_t)(v - pDiffInfo->prev.i64);  // direct previous may be null
          if (pDiffInfo->ignoreNegative) {
            continue;
          }

1300 1301 1302 1303 1304
          //          *(pOutput++) = delta;
          //          *pTimestamp  = (tsList != NULL)? tsList[i]:0;
          //
          //          pOutput    += 1;
          //          pTimestamp += 1;
H
Haojun Liao 已提交
1305 1306 1307
        }

        pDiffInfo->prev.i64 = v;
H
Haojun Liao 已提交
1308
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324
        numOfElems++;
      }
      break;
    }
#if 0
    case TSDB_DATA_TYPE_DOUBLE: {
      double *pData = (double *)data;
      double *pOutput = (double *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }
H
Haojun Liao 已提交
1325

H
Haojun Liao 已提交
1326
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1327 1328 1329 1330 1331 1332 1333
          SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->d64Prev = pData[i];
H
Haojun Liao 已提交
1334
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
        numOfElems++;
      }
      break;
    }
    case TSDB_DATA_TYPE_FLOAT: {
      float *pData = (float *)data;
      float *pOutput = (float *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1351
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1352 1353 1354 1355 1356 1357 1358
          *pOutput = (float)(pData[i] - pDiffInfo->d64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->d64Prev = pData[i];
H
Haojun Liao 已提交
1359
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375
        numOfElems++;
      }
      break;
    }
    case TSDB_DATA_TYPE_SMALLINT: {
      int16_t *pData = (int16_t *)data;
      int16_t *pOutput = (int16_t *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1376
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1377 1378 1379 1380 1381 1382 1383
          *pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->i64Prev = pData[i];
H
Haojun Liao 已提交
1384
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401
        numOfElems++;
      }
      break;
    }

    case TSDB_DATA_TYPE_TINYINT: {
      int8_t *pData = (int8_t *)data;
      int8_t *pOutput = (int8_t *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1402
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1403 1404 1405 1406 1407 1408 1409
          *pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->i64Prev = pData[i];
H
Haojun Liao 已提交
1410
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1411 1412 1413 1414 1415 1416 1417
        numOfElems++;
      }
      break;
    }
#endif
    default:
      break;
1418
      //      qError("error input type");
H
Haojun Liao 已提交
1419 1420 1421
  }

  // initial value is not set yet
1422
  if (numOfElems <= 0) {
H
Haojun Liao 已提交
1423 1424 1425 1426 1427
    /*
     * 1. current block and blocks before are full of null
     * 2. current block may be null value
     */
    assert(pCtx->hasNull);
wmmhello's avatar
wmmhello 已提交
1428
    return 0;
H
Haojun Liao 已提交
1429
  } else {
1430
    return (isFirstBlock) ? numOfElems - 1 : numOfElems;
H
Haojun Liao 已提交
1431
  }
H
Haojun Liao 已提交
1432
}
H
Haojun Liao 已提交
1433

1434
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
1435
  SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
1436
  pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
1437 1438 1439
  return true;
}

1440 1441 1442 1443
static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  STopBotRes*          pRes = GET_ROWCELL_INTERBUF(pResInfo);
  pRes->pItems = (STopBotResItem*)((char*)pRes + sizeof(STopBotRes));
1444 1445

  return pRes;
1446 1447
}

1448 1449
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
                            uint64_t uid, SResultRowEntryInfo* pEntryInfo);
1450

1451 1452 1453
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);

1454 1455 1456
int32_t topFunction(SqlFunctionCtx* pCtx) {
  int32_t              numOfElems = 0;
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
1457

1458 1459 1460
  //  if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
  //    buildTopBotStruct(pRes, pCtx);
  //  }
1461 1462

  SInputColumnInfoData* pInput = &pCtx->input;
1463
  SColumnInfoData*      pCol = pInput->pData[0];
1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476

  int32_t type = pInput->pData[0]->info.type;

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

  for (int32_t i = start; i < numOfRows + start; ++i) {
    if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
      continue;
    }
    numOfElems++;

    char* data = colDataGetData(pCol, i);
1477
    doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo);
1478 1479
  }

1480
  return TSDB_CODE_SUCCESS;
1481 1482
}

1483 1484
static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) {
  uint16_t type = *(uint16_t*)param;
1485

1486 1487
  STopBotResItem* val1 = (STopBotResItem*)p1;
  STopBotResItem* val2 = (STopBotResItem*)p2;
1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509

  if (IS_SIGNED_NUMERIC_TYPE(type)) {
    if (val1->v.i == val2->v.i) {
      return 0;
    }

    return (val1->v.i > val2->v.i) ? 1 : -1;
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
    if (val1->v.u == val2->v.u) {
      return 0;
    }

    return (val1->v.u > val2->v.u) ? 1 : -1;
  }

  if (val1->v.d == val2->v.d) {
    return 0;
  }

  return (val1->v.d > val2->v.d) ? 1 : -1;
}

1510 1511
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
                     uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
1512 1513
  STopBotRes* pRes = getTopBotOutputInfo(pCtx);
  int32_t     maxSize = pCtx->param[1].param.i;
1514

1515 1516 1517
  SVariant val = {0};
  taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);

1518
  STopBotResItem* pItems = pRes->pItems;
1519 1520 1521
  assert(pItems != NULL);

  // not full yet
1522 1523
  if (pEntryInfo->numOfRes < maxSize) {
    STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
1524
    pItem->v = val;
1525
    pItem->uid = uid;
1526

1527 1528
    // save the data of this tuple
    saveTupleData(pCtx, rowIndex, pSrcBlock, pItem);
1529 1530 1531

    // allocate the buffer and keep the data of this row into the new allocated buffer
    pEntryInfo->numOfRes++;
1532 1533 1534
    taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
                 false);
  } else {  // replace the minimum value in the result
1535
    if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
1536
        (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
1537
      // replace the old data and the coresponding tuple data
1538
      STopBotResItem* pItem = &pItems[0];
1539
      pItem->v = val;
1540
      pItem->uid = uid;
1541 1542 1543

      // save the data of this tuple by over writing the old data
      copyTupleData(pCtx, rowIndex, pSrcBlock, pItem);
1544

1545 1546
      taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
                     topBotResComparFn, NULL, false);
1547
    }
1548 1549
  }
}
1550

1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
  SFilePage* pPage = NULL;

  int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool);

  if (pCtx->curBufPage == -1) {
    pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
    pPage->num = sizeof(SFilePage);
  } else {
    pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage);
    if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) {
      pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
      pPage->num = sizeof(SFilePage);
    }
  }

  pItem->tuplePos.pageId = pCtx->curBufPage;

  // keep the current row data, extract method
  int32_t offset = 0;
  bool*   nullList = (bool*)((char*)pPage + pPage->num);
  char*   pStart = (char*)(nullList + sizeof(bool) * pSrcBlock->info.numOfCols);
  for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
    bool             isNull = colDataIsNull_s(pCol, rowIndex);
    if (isNull) {
      nullList[i] = true;
      continue;
    }

    char* p = colDataGetData(pCol, rowIndex);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      memcpy(pStart + offset, p, varDataTLen(p));
    } else {
      memcpy(pStart + offset, p, pCol->info.bytes);
    }

    offset += pCol->info.bytes;
  }

  pItem->tuplePos.offset = pPage->num;
  pPage->num += completeRowSize;

  setBufPageDirty(pPage, true);
  releaseBufPage(pCtx->pBuf, pPage);
}

void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
  SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId);

  bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset);
  char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));

  int32_t offset = 0;
1605
  for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625
    SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
    if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
      continue;
    }

    char* p = colDataGetData(pCol, rowIndex);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      memcpy(pStart + offset, p, varDataTLen(p));
    } else {
      memcpy(pStart + offset, p, pCol->info.bytes);
    }

    offset += pCol->info.bytes;
  }

  setBufPageDirty(pPage, true);
  releaseBufPage(pCtx->pBuf, pPage);
}

int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
1626 1627
  SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
  STopBotRes*          pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
1628 1629
  pEntryInfo->complete = true;

1630 1631
  int32_t          type = pCtx->input.pData[0]->info.type;
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
1632 1633 1634 1635
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

  // todo assign the tag value and the corresponding row data
  int32_t currentRow = pBlock->info.rows;
1636
  switch (type) {
1637 1638 1639
    case TSDB_DATA_TYPE_INT: {
      for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
        STopBotResItem* pItem = &pRes->pItems[i];
1640
        colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i);
1641 1642 1643

        int32_t pageId = pItem->tuplePos.pageId;
        int32_t offset = pItem->tuplePos.offset;
1644 1645
        if (pItem->tuplePos.pageId != -1) {
          SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
H
Haojun Liao 已提交
1646

1647 1648 1649 1650 1651 1652 1653
          bool* nullList = (bool*)((char*)pPage + offset);
          char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));

          // todo set the offset value to optimize the performance.
          for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
            SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];

1654 1655 1656
            SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
            int32_t      srcSlotId = pFuncParam->pCol->slotId;
            int32_t      dstSlotId = pCtx->pExpr->base.resSchema.slotId;
1657 1658

            int32_t ps = 0;
1659
            for (int32_t k = 0; k < srcSlotId; ++k) {
1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670
              SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
              ps += pSrcCol->info.bytes;
            }

            SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
            if (nullList[srcSlotId]) {
              colDataAppendNULL(pDstCol, currentRow);
            } else {
              colDataAppend(pDstCol, currentRow, (pStart + ps), false);
            }
          }
1671
        }
1672 1673

        currentRow += 1;
1674
      }
H
Haojun Liao 已提交
1675

1676 1677 1678 1679 1680
      break;
    }
  }

  return pEntryInfo->numOfRes;
1681
}
G
Ganlin Zhao 已提交
1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779

bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SSpreadInfo);
  return true;
}

bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
  SET_DOUBLE_VAL(&pInfo->min, DBL_MAX);
  SET_DOUBLE_VAL(&pInfo->max, -DBL_MAX);
  pInfo->hasResult = false;
  return true;
}

int32_t spreadFunction(SqlFunctionCtx *pCtx) {
  int32_t numOfElems = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
  int32_t type = pInput->pData[0]->info.type;

  SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

  if (pInput->colDataAggIsSet) {
    numOfElems = pInput->numOfRows - pAgg->numOfNull;
    if (numOfElems == 0) {
      goto _spread_over;
    }
    double tmin = 0.0, tmax = 0.0;
    if (IS_SIGNED_NUMERIC_TYPE(type)) {
      tmin = (double)GET_INT64_VAL(&pAgg->min);
      tmax = (double)GET_INT64_VAL(&pAgg->max);
    } else if (IS_FLOAT_TYPE(type)) {
      tmin = GET_DOUBLE_VAL(&pAgg->min);
      tmax = GET_DOUBLE_VAL(&pAgg->max);
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
      tmin = (double)GET_UINT64_VAL(&pAgg->min);
      tmax = (double)GET_UINT64_VAL(&pAgg->max);
    }

    if (GET_DOUBLE_VAL(&pInfo->min) > tmin) {
      SET_DOUBLE_VAL(&pInfo->min, tmin);
    }

    if (GET_DOUBLE_VAL(&pInfo->max) < tmax) {
      SET_DOUBLE_VAL(&pInfo->max, tmax);
    }

  } else {  // computing based on the true data block
    SColumnInfoData* pCol = pInput->pData[0];

    int32_t start     = pInput->startRowIndex;
    int32_t numOfRows = pInput->numOfRows;

    // check the valid data one by one
    for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
      if (colDataIsNull_f(pCol->nullbitmap, i)) {
        continue;
      }

      char *data = colDataGetData(pCol, i);

      double v = 0;
      GET_TYPED_DATA(v, double, type, data);
      if (v < GET_DOUBLE_VAL(&pInfo->min)) {
        SET_DOUBLE_VAL(&pInfo->min, v);
      }

      if (v > GET_DOUBLE_VAL(&pInfo->max)) {
        SET_DOUBLE_VAL(&pInfo->max, v);
      }

      numOfElems += 1;
    }
  }

_spread_over:
  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
  if (numOfElems > 0) {
    pInfo->hasResult = true;
  }

  return TSDB_CODE_SUCCESS;
}

int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
  SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  if (pInfo->hasResult == true) {
    SET_DOUBLE_VAL(&pInfo->result, pInfo->max - pInfo->min);
  }
  return functionFinalize(pCtx, pBlock);
}