builtinsimpl.c 55.5 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "builtinsimpl.h"
17
#include "function.h"
18
#include "querynodes.h"
H
Haojun Liao 已提交
19 20
#include "taggfunction.h"
#include "tdatablock.h"
21
#include "tpercentile.h"
H
Haojun Liao 已提交
22

G
Ganlin Zhao 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36
typedef struct SSumRes {
  union {
    int64_t  isum;
    uint64_t usum;
    double   dsum;
  };
} SSumRes;

typedef struct SAvgRes {
  double  result;
  SSumRes sum;
  int64_t count;
} SAvgRes;

37 38
typedef struct STopBotResItem {
  SVariant v;
39
  uint64_t uid;  // it is a table uid, used to extract tag data during building of the final result for the tag data
40
  struct {
41 42 43
    int32_t pageId;
    int32_t offset;
  } tuplePos;  // tuple data of this chosen row
44 45
} STopBotResItem;

G
Ganlin Zhao 已提交
46
typedef struct STopBotRes {
47
  STopBotResItem* pItems;
G
Ganlin Zhao 已提交
48 49 50 51 52
} STopBotRes;

typedef struct SStddevRes {
  double  result;
  int64_t count;
53 54 55 56 57 58 59 60
  union {
    double  quadraticDSum;
    int64_t quadraticISum;
  };
  union {
    double  dsum;
    int64_t isum;
  };
G
Ganlin Zhao 已提交
61 62 63 64
} SStddevRes;

typedef struct SPercentileInfo {
  double      result;
65
  tMemBucket* pMemBucket;
G
Ganlin Zhao 已提交
66 67 68 69 70 71 72
  int32_t     stage;
  double      minval;
  double      maxval;
  int64_t     numOfElems;
} SPercentileInfo;

typedef struct SDiffInfo {
73 74 75 76 77 78 79 80
  bool hasPrev;
  bool includeNull;
  bool ignoreNegative;
  bool firstOutput;
  union {
    int64_t i64;
    double  d64;
  } prev;
G
Ganlin Zhao 已提交
81 82
} SDiffInfo;

G
Ganlin Zhao 已提交
83 84 85 86 87 88 89
typedef struct SSpreadInfo {
  double result;
  bool   hasResult;
  double min;
  double max;
} SSpreadInfo;

90 91 92 93 94 95
#define SET_VAL(_info, numOfElem, res) \
  do {                                 \
    if ((numOfElem) <= 0) {            \
      break;                           \
    }                                  \
    (_info)->numOfRes = (res);         \
H
Haojun Liao 已提交
96 97
  } while (0)

G
Ganlin Zhao 已提交
98 99 100 101 102 103
#define GET_TS_LIST(x)    ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])

#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx)                      \
  do {                                                             \
    for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
104
      SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i];      \
G
Ganlin Zhao 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
      __ctx->fpSet.process(__ctx);                                 \
    }                                                              \
  } while (0);

#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
  do {                                                \
    if (((left) < (right)) ^ (sign)) {                \
      (left) = (right);                               \
      DO_UPDATE_SUBSID_RES(ctx, _ts);                 \
      (num) += 1;                                     \
    }                                                 \
  } while (0)

#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num)        \
  do {                                                                   \
120
    _t* d = (_t*)((_col)->pData);                                        \
G
Ganlin Zhao 已提交
121 122 123 124 125 126 127 128 129
    for (int32_t i = (_start); i < (_nrow) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      }                                                                  \
      TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0;       \
      UPDATE_DATA(ctx, val, d[i], num, sign, ts);                        \
    }                                                                    \
  } while (0)

130
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
H
Haojun Liao 已提交
131 132 133 134 135 136 137 138 139 140 141 142
  if (pResultInfo->initialized) {
    return false;
  }

  if (pCtx->pOutput != NULL) {
    memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes);
  }

  initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize);
  return true;
}

143
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
144
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
145
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
146

147
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
148 149
  pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
  /*cleanupResultRowEntry(pResInfo);*/
150 151 152 153 154

  char* in = GET_ROWCELL_INTERBUF(pResInfo);
  colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);

  return pResInfo->numOfRes;
H
Haojun Liao 已提交
155 156
}

157
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
158
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
159 160 161
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
162
  pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
163 164 165 166 167 168 169 170
  cleanupResultRowEntry(pResInfo);

  char* in = finalResult;
  colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);

  return pResInfo->numOfRes;
}

171 172 173
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
  SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
  if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
174
    return FUNC_DATA_REQUIRED_NOT_LOAD;
175
  }
176
  return FUNC_DATA_REQUIRED_STATIS_LOAD;
177
}
H
Haojun Liao 已提交
178 179 180 181 182 183 184 185 186 187

bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

/*
 * count function does need the finalize, if data is missing, the default value, which is 0, is used
 * count function does not use the pCtx->interResBuf to keep the intermediate buffer
 */
188
int32_t countFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
189 190 191
  int32_t numOfElem = 0;

  /*
H
Haojun Liao 已提交
192 193 194
   * 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true;
   * 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true;
   * 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false;
H
Haojun Liao 已提交
195 196
   */
  SInputColumnInfoData* pInput = &pCtx->input;
197
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
198 199 200 201 202 203 204 205 206 207 208 209
  if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) {
    numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
    ASSERT(numOfElem >= 0);
  } else {
    if (pInputCol->hasNull) {
      for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
        if (colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
          continue;
        }
        numOfElem += 1;
      }
    } else {
210 211
      // when counting on the primary time stamp column and no statistics data is presented, use the size value
      // directly.
H
Haojun Liao 已提交
212 213 214 215 216
      numOfElem = pInput->numOfRows;
    }
  }

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
217 218
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
  *((int64_t*)buf) += numOfElem;
H
Haojun Liao 已提交
219 220

  SET_VAL(pResInfo, numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
221
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
222 223 224 225
}

#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem)             \
  do {                                                                   \
226
    _t* d = (_t*)(_col->pData);                                          \
H
Haojun Liao 已提交
227 228 229 230 231 232 233 234 235
    for (int32_t i = (_start); i < (_rows) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      };                                                                 \
      (_res) += (d)[i];                                                  \
      (numOfElem)++;                                                     \
    }                                                                    \
  } while (0)

236
int32_t sumFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
237 238 239 240
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
241 242
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
  int32_t               type = pInput->pData[0]->info.type;
H
Haojun Liao 已提交
243

244
  SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
G
Ganlin Zhao 已提交
245

H
Haojun Liao 已提交
246 247 248 249 250
  if (pInput->colDataAggIsSet) {
    numOfElem = pInput->numOfRows - pAgg->numOfNull;
    ASSERT(numOfElem >= 0);

    if (IS_SIGNED_NUMERIC_TYPE(type)) {
251
      pSumRes->isum += pAgg->sum;
H
Haojun Liao 已提交
252
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
253
      pSumRes->usum += pAgg->sum;
H
Haojun Liao 已提交
254
    } else if (IS_FLOAT_TYPE(type)) {
255
      pSumRes->dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
H
Haojun Liao 已提交
256 257 258 259
    }
  } else {  // computing based on the true data block
    SColumnInfoData* pCol = pInput->pData[0];

260
    int32_t start = pInput->startRowIndex;
H
Haojun Liao 已提交
261 262
    int32_t numOfRows = pInput->numOfRows;

263 264
    if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
      if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
265 266 267 268 269 270 271
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int8_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_SMALLINT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int16_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_INT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int32_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_BIGINT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int64_t, numOfElem);
H
Haojun Liao 已提交
272
      }
273 274 275 276 277 278 279 280 281
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
      if (type == TSDB_DATA_TYPE_UTINYINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint8_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_USMALLINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint16_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_UINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint32_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_UBIGINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint64_t, numOfElem);
H
Haojun Liao 已提交
282
      }
283 284 285 286
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
      LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, double, numOfElem);
    } else if (type == TSDB_DATA_TYPE_FLOAT) {
      LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, float, numOfElem);
H
Haojun Liao 已提交
287 288 289 290 291
    }
  }

  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
292
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
293 294
}

H
Haojun Liao 已提交
295
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
H
Haojun Liao 已提交
296 297 298 299
  pEnv->calcMemSize = sizeof(SSumRes);
  return true;
}

G
Ganlin Zhao 已提交
300 301 302 303 304
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(double);
  return true;
}

305
bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
G
Ganlin Zhao 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
  memset(pRes, 0, sizeof(SAvgRes));
  return true;
}

int32_t avgFunction(SqlFunctionCtx* pCtx) {
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t               type = pInput->pData[0]->info.type;

  SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

  // computing based on the true data block
  SColumnInfoData* pCol = pInput->pData[0];

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

  switch (type) {
    case TSDB_DATA_TYPE_TINYINT: {
332 333 334 335
      int8_t* plist = (int8_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
G
Ganlin Zhao 已提交
336 337
        }

338 339 340
        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
G
Ganlin Zhao 已提交
341 342
      }

343 344 345 346
      break;
    }

    case TSDB_DATA_TYPE_SMALLINT: {
G
Ganlin Zhao 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
      int16_t* plist = (int16_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      int32_t* plist = (int32_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }

      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      int64_t* plist = (int64_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      float* plist = (float*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.dsum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double* plist = (double*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.dsum += plist[i];
      }
      break;
    }

    default:
      break;
  }

  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
426
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
G
Ganlin Zhao 已提交
427
  SInputColumnInfoData* pInput = &pCtx->input;
428 429
  int32_t               type = pInput->pData[0]->info.type;
  SAvgRes*              pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
G
Ganlin Zhao 已提交
430
  if (IS_INTEGER_TYPE(type)) {
431
    pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
G
Ganlin Zhao 已提交
432
  } else {
433
    pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
G
Ganlin Zhao 已提交
434
  }
435

H
Haojun Liao 已提交
436
  return functionFinalize(pCtx, pBlock);
G
Ganlin Zhao 已提交
437 438
}

439
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
440 441 442
  return FUNC_DATA_REQUIRED_STATIS_LOAD;
}

443
bool maxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
444 445 446 447 448
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
449
  switch (pCtx->resDataInfo.type) {
450
    case TSDB_DATA_TYPE_INT:
451
      *((int32_t*)buf) = INT32_MIN;
452 453
      break;
    case TSDB_DATA_TYPE_UINT:
454
      *((uint32_t*)buf) = 0;
455 456
      break;
    case TSDB_DATA_TYPE_FLOAT:
457
      *((float*)buf) = -FLT_MAX;
458 459
      break;
    case TSDB_DATA_TYPE_DOUBLE:
460
      SET_DOUBLE_VAL(((double*)buf), -DBL_MAX);
461 462
      break;
    case TSDB_DATA_TYPE_BIGINT:
463
      *((int64_t*)buf) = INT64_MIN;
464 465
      break;
    case TSDB_DATA_TYPE_UBIGINT:
466
      *((uint64_t*)buf) = 0;
467 468
      break;
    case TSDB_DATA_TYPE_SMALLINT:
469
      *((int16_t*)buf) = INT16_MIN;
470 471
      break;
    case TSDB_DATA_TYPE_USMALLINT:
472
      *((uint16_t*)buf) = 0;
473 474
      break;
    case TSDB_DATA_TYPE_TINYINT:
475
      *((int8_t*)buf) = INT8_MIN;
476 477
      break;
    case TSDB_DATA_TYPE_UTINYINT:
478
      *((uint8_t*)buf) = 0;
479
      break;
480 481 482
    case TSDB_DATA_TYPE_BOOL:
      *((int8_t*)buf) = 0;
      break;
483 484 485 486 487 488
    default:
      assert(0);
  }
  return true;
}

489
bool minFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
490 491 492 493 494
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;  // not initialized since it has been initialized
  }

  char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
495
  switch (pCtx->resDataInfo.type) {
496
    case TSDB_DATA_TYPE_TINYINT:
497
      *((int8_t*)buf) = INT8_MAX;
498 499
      break;
    case TSDB_DATA_TYPE_UTINYINT:
500
      *(uint8_t*)buf = UINT8_MAX;
501 502
      break;
    case TSDB_DATA_TYPE_SMALLINT:
503
      *((int16_t*)buf) = INT16_MAX;
504 505
      break;
    case TSDB_DATA_TYPE_USMALLINT:
506
      *((uint16_t*)buf) = UINT16_MAX;
507 508
      break;
    case TSDB_DATA_TYPE_INT:
509
      *((int32_t*)buf) = INT32_MAX;
510 511
      break;
    case TSDB_DATA_TYPE_UINT:
512
      *((uint32_t*)buf) = UINT32_MAX;
513 514
      break;
    case TSDB_DATA_TYPE_BIGINT:
515
      *((int64_t*)buf) = INT64_MAX;
516 517
      break;
    case TSDB_DATA_TYPE_UBIGINT:
518
      *((uint64_t*)buf) = UINT64_MAX;
519 520
      break;
    case TSDB_DATA_TYPE_FLOAT:
521
      *((float*)buf) = FLT_MAX;
522 523
      break;
    case TSDB_DATA_TYPE_DOUBLE:
524
      SET_DOUBLE_VAL(((double*)buf), DBL_MAX);
525
      break;
526 527 528
    case TSDB_DATA_TYPE_BOOL:
      *((int8_t*)buf) = 1;
      break;
529 530 531 532 533 534 535
    default:
      assert(0);
  }

  return true;
}

H
Haojun Liao 已提交
536
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
537 538 539 540 541 542 543 544 545 546
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

#define GET_TS_LIST(x)    ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])

#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx)                      \
  do {                                                             \
    for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
547
      SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i];      \
H
Haojun Liao 已提交
548
      __ctx->fpSet.process(__ctx);                                 \
549 550 551
    }                                                              \
  } while (0);

552 553
#define DO_UPDATE_SUBSID_RES(ctx, ts)                          \
  do {                                                         \
554
    for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
555 556 557 558 559 560 561
      SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i];    \
      if (__ctx->functionId == FUNCTION_TS_DUMMY) {            \
        __ctx->tag.i = (ts);                                   \
        __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;              \
      }                                                        \
      __ctx->fpSet.process(__ctx);                             \
    }                                                          \
562 563 564
  } while (0)

#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
H
Haojun Liao 已提交
565 566 567 568 569 570
  do {                                                \
    if (((left) < (right)) ^ (sign)) {                \
      (left) = (right);                               \
      DO_UPDATE_SUBSID_RES(ctx, _ts);                 \
      (num) += 1;                                     \
    }                                                 \
571 572
  } while (0)

H
Haojun Liao 已提交
573
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num)        \
574
  do {                                                                   \
575
    _t* d = (_t*)((_col)->pData);                                        \
576 577 578 579 580 581 582 583 584
    for (int32_t i = (_start); i < (_nrow) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      }                                                                  \
      TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0;       \
      UPDATE_DATA(ctx, val, d[i], num, sign, ts);                        \
    }                                                                    \
  } while (0)

585
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
586 587 588
  int32_t numOfElems = 0;

  SInputColumnInfoData* pInput = &pCtx->input;
589
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
590 591

  SColumnInfoData* pCol = pInput->pData[0];
592
  int32_t          type = pCol->info.type;
593 594

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
595
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
596 597 598 599 600 601 602 603 604 605 606 607 608 609

  // data in current data block are qualified to the query
  if (pInput->colDataAggIsSet) {
    numOfElems = pInput->numOfRows - pAgg->numOfNull;
    ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0);

    if (numOfElems == 0) {
      return numOfElems;
    }

    void*   tval = NULL;
    int16_t index = 0;

    if (isMinFunc) {
610
      tval = &pInput->pColumnDataAgg[0]->min;
611 612
      index = pInput->pColumnDataAgg[0]->minIndex;
    } else {
613
      tval = &pInput->pColumnDataAgg[0]->max;
614 615 616
      index = pInput->pColumnDataAgg[0]->maxIndex;
    }

617
    // the index is the original position, not the relative position
618
    TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL;
619 620

    if (IS_SIGNED_NUMERIC_TYPE(type)) {
621 622
      int64_t prev = 0;
      GET_TYPED_DATA(prev, int64_t, type, buf);
623

624 625
      int64_t val = GET_INT64_VAL(tval);
      if ((prev < val) ^ isMinFunc) {
626
        *(int64_t*)buf = val;
627 628
        for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
          SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
629 630 631 632 633 634 635 636
          if (__ctx->functionId == FUNCTION_TS_DUMMY) {  // TODO refactor
            __ctx->tag.i = key;
            __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
          }

          __ctx->fpSet.process(__ctx);
        }
      }
637
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
638 639 640
      uint64_t prev = 0;
      GET_TYPED_DATA(prev, uint64_t, type, buf);

641
      uint64_t val = GET_UINT64_VAL(tval);
H
Haojun Liao 已提交
642
      if ((prev < val) ^ isMinFunc) {
643
        *(uint64_t*)buf = val;
644 645
        for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
          SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
H
Haojun Liao 已提交
646 647 648 649
          if (__ctx->functionId == FUNCTION_TS_DUMMY) {  // TODO refactor
            __ctx->tag.i = key;
            __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
          }
650

H
Haojun Liao 已提交
651 652 653 654
          __ctx->fpSet.process(__ctx);
        }
      }
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
655 656
      double val = GET_DOUBLE_VAL(tval);
      UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key);
657
    } else if (type == TSDB_DATA_TYPE_FLOAT) {
658
      double val = GET_DOUBLE_VAL(tval);
659
      UPDATE_DATA(pCtx, *(float*)buf, val, numOfElems, isMinFunc, key);
660 661 662 663 664 665 666 667
    }

    return numOfElems;
  }

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

668 669
  if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
    if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
670 671
      LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
    } else if (type == TSDB_DATA_TYPE_SMALLINT) {
672
      LOOPCHECK_N(*(int16_t*)buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
673
    } else if (type == TSDB_DATA_TYPE_INT) {
674 675
      int32_t* pData = (int32_t*)pCol->pData;
      int32_t* val = (int32_t*)buf;
676

H
Haojun Liao 已提交
677
      for (int32_t i = start; i < start + numOfRows; ++i) {
678 679 680 681 682 683
        if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        if ((*val < pData[i]) ^ isMinFunc) {
          *val = pData[i];
684
          TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0;
685 686 687 688 689 690 691 692 693
          DO_UPDATE_SUBSID_RES(pCtx, ts);
        }

        numOfElems += 1;
      }

#if defined(_DEBUG_VIEW)
      qDebug("max value updated:%d", *retVal);
#endif
694
    } else if (type == TSDB_DATA_TYPE_BIGINT) {
695
      LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
696
    }
697 698
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
    if (type == TSDB_DATA_TYPE_UTINYINT) {
699
      LOOPCHECK_N(*(uint8_t*)buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
700
    } else if (type == TSDB_DATA_TYPE_USMALLINT) {
701
      LOOPCHECK_N(*(uint16_t*)buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
702
    } else if (type == TSDB_DATA_TYPE_UINT) {
703
      LOOPCHECK_N(*(uint32_t*)buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
704
    } else if (type == TSDB_DATA_TYPE_UBIGINT) {
705
      LOOPCHECK_N(*(uint64_t*)buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
706
    }
707
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
708
    LOOPCHECK_N(*(double*)buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
709
  } else if (type == TSDB_DATA_TYPE_FLOAT) {
710
    LOOPCHECK_N(*(float*)buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
711 712 713
  }

  return numOfElems;
H
Haojun Liao 已提交
714
}
715

716
int32_t minFunction(SqlFunctionCtx* pCtx) {
717 718
  int32_t numOfElems = doMinMaxHelper(pCtx, 1);
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
719
  return TSDB_CODE_SUCCESS;
720 721
}

722
int32_t maxFunction(SqlFunctionCtx* pCtx) {
723 724
  int32_t numOfElems = doMinMaxHelper(pCtx, 0);
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
725
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
726 727 728 729 730 731 732
}

bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SStddevRes);
  return true;
}

733
bool stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
H
Haojun Liao 已提交
734 735 736 737 738 739 740 741 742
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SStddevRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
  memset(pRes, 0, sizeof(SStddevRes));
  return true;
}

H
Haojun Liao 已提交
743
int32_t stddevFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
744 745 746 747
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
H
Haojun Liao 已提交
748
  int32_t               type = pInput->pData[0]->info.type;
H
Haojun Liao 已提交
749 750 751

  SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

H
Haojun Liao 已提交
752 753
  // computing based on the true data block
  SColumnInfoData* pCol = pInput->pData[0];
H
Haojun Liao 已提交
754

H
Haojun Liao 已提交
755 756
  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;
H
Haojun Liao 已提交
757

H
Haojun Liao 已提交
758 759
  switch (type) {
    case TSDB_DATA_TYPE_TINYINT: {
760 761 762 763
      int8_t* plist = (int8_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + start; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
H
Haojun Liao 已提交
764
        }
H
Haojun Liao 已提交
765

766 767 768 769
        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
H
Haojun Liao 已提交
770 771
      }

772 773 774 775
      break;
    }

    case TSDB_DATA_TYPE_SMALLINT: {
H
Haojun Liao 已提交
776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829
      int16_t* plist = (int16_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      int32_t* plist = (int32_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }

      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      int64_t* plist = (int64_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      float* plist = (float*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
G
Ganlin Zhao 已提交
830 831
        pStddevRes->dsum += plist[i];
        pStddevRes->quadraticDSum += plist[i] * plist[i];
H
Haojun Liao 已提交
832 833 834 835
      }
      break;
    }

H
Haojun Liao 已提交
836 837 838 839 840 841 842 843 844
    case TSDB_DATA_TYPE_DOUBLE: {
      double* plist = (double*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
G
Ganlin Zhao 已提交
845 846
        pStddevRes->dsum += plist[i];
        pStddevRes->quadraticDSum += plist[i] * plist[i];
H
Haojun Liao 已提交
847 848 849 850 851 852 853 854
      }
      break;
    }

    default:
      break;
  }

H
Haojun Liao 已提交
855 856
  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
857
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
858 859
}

H
Haojun Liao 已提交
860
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
G
Ganlin Zhao 已提交
861
  SInputColumnInfoData* pInput = &pCtx->input;
862 863 864
  int32_t               type = pInput->pData[0]->info.type;
  SStddevRes*           pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  double                avg;
G
Ganlin Zhao 已提交
865
  if (IS_INTEGER_TYPE(type)) {
866 867
    avg = pStddevRes->isum / ((double)pStddevRes->count);
    pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg);
G
Ganlin Zhao 已提交
868
  } else {
869 870
    avg = pStddevRes->dsum / ((double)pStddevRes->count);
    pStddevRes->result = sqrt(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg);
G
Ganlin Zhao 已提交
871
  }
872

873
  return functionFinalize(pCtx, pBlock);
H
Haojun Liao 已提交
874 875 876 877 878 879 880
}

bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SPercentileInfo);
  return true;
}

881
bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
H
Haojun Liao 已提交
882 883 884 885 886
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  // in the first round, get the min-max value of all involved data
887
  SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
H
Haojun Liao 已提交
888 889 890 891 892
  SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX);
  SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX);
  pInfo->numOfElems = 0;

  return true;
H
Haojun Liao 已提交
893 894
}

895 896 897
int32_t percentileFunction(SqlFunctionCtx* pCtx) {
  int32_t              notNullElems = 0;
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
898 899

  SInputColumnInfoData* pInput = &pCtx->input;
900
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
H
Haojun Liao 已提交
901

902 903
  SColumnInfoData* pCol = pInput->pData[0];
  int32_t          type = pCol->info.type;
904

905
  SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
906 907
  if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
    pInfo->stage += 1;
H
Haojun Liao 已提交
908

H
Haojun Liao 已提交
909 910 911
    // all data are null, set it completed
    if (pInfo->numOfElems == 0) {
      pResInfo->complete = true;
H
Haojun Liao 已提交
912
      return 0;
H
Haojun Liao 已提交
913 914 915 916 917 918 919
    } else {
      pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
    }
  }

  // the first stage, only acquire the min/max value
  if (pInfo->stage == 0) {
920
    if (pCtx->input.colDataAggIsSet) {
H
Haojun Liao 已提交
921
      double tmin = 0.0, tmax = 0.0;
922 923 924 925 926 927 928 929 930
      if (IS_SIGNED_NUMERIC_TYPE(type)) {
        tmin = (double)GET_INT64_VAL(&pAgg->min);
        tmax = (double)GET_INT64_VAL(&pAgg->max);
      } else if (IS_FLOAT_TYPE(type)) {
        tmin = GET_DOUBLE_VAL(&pAgg->min);
        tmax = GET_DOUBLE_VAL(&pAgg->max);
      } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
        tmin = (double)GET_UINT64_VAL(&pAgg->min);
        tmax = (double)GET_UINT64_VAL(&pAgg->max);
H
Haojun Liao 已提交
931 932 933 934 935 936 937 938 939 940
      }

      if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
        SET_DOUBLE_VAL(&pInfo->minval, tmin);
      }

      if (GET_DOUBLE_VAL(&pInfo->maxval) < tmax) {
        SET_DOUBLE_VAL(&pInfo->maxval, tmax);
      }

941
      pInfo->numOfElems += (pInput->numOfRows - pAgg->numOfNull);
H
Haojun Liao 已提交
942
    } else {
943 944 945 946
      // check the valid data one by one
      int32_t start = pInput->startRowIndex;
      for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
H
Haojun Liao 已提交
947 948 949
          continue;
        }

950
        char* data = colDataGetData(pCol, i);
951

H
Haojun Liao 已提交
952 953 954 955 956 957 958 959 960 961 962 963 964 965
        double v = 0;
        GET_TYPED_DATA(v, double, pCtx->inputType, data);
        if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
          SET_DOUBLE_VAL(&pInfo->minval, v);
        }

        if (v > GET_DOUBLE_VAL(&pInfo->maxval)) {
          SET_DOUBLE_VAL(&pInfo->maxval, v);
        }

        pInfo->numOfElems += 1;
      }
    }

H
Haojun Liao 已提交
966
    return 0;
H
Haojun Liao 已提交
967 968 969
  }

  // the second stage, calculate the true percentile value
970 971 972
  int32_t start = pInput->startRowIndex;
  for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
    if (colDataIsNull_f(pCol->nullbitmap, i)) {
H
Haojun Liao 已提交
973 974 975
      continue;
    }

976
    char* data = colDataGetData(pCol, i);
977

H
Haojun Liao 已提交
978 979 980 981
    notNullElems += 1;
    tMemBucketPut(pInfo->pMemBucket, data, 1);
  }

982
  SET_VAL(pResInfo, notNullElems, 1);
wmmhello's avatar
wmmhello 已提交
983
  return TSDB_CODE_SUCCESS;
984 985
}

986
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
987
  SVariant* pVal = &pCtx->param[1].param;
988
  double    v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
H
Haojun Liao 已提交
989

990 991
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  SPercentileInfo*     ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
992

993
  tMemBucket* pMemBucket = ppInfo->pMemBucket;
994 995 996 997 998
  if (pMemBucket != NULL && pMemBucket->total > 0) {  // check for null
    SET_DOUBLE_VAL(&ppInfo->result, getPercentile(pMemBucket, v));
  }

  tMemBucketDestroy(pMemBucket);
999
  return functionFinalize(pCtx, pBlock);
H
Haojun Liao 已提交
1000
}
H
Haojun Liao 已提交
1001

H
Haojun Liao 已提交
1002 1003
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
1004
  pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
H
Haojun Liao 已提交
1005 1006 1007
  return true;
}

1008 1009 1010 1011 1012
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
  if (pTsColInfo == NULL) {
    return 0;
  }

1013
  return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
1014 1015
}

1016 1017
// This ordinary first function does not care if current scan is ascending order or descending order scan
// the OPTIMIZED version of first function will only handle the ascending order scan
1018
int32_t firstFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
1019 1020
  int32_t numOfElems = 0;

1021 1022
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1023 1024

  SInputColumnInfoData* pInput = &pCtx->input;
1025
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
1026

1027 1028
  int32_t bytes = pInputCol->info.bytes;

H
Haojun Liao 已提交
1029
  // All null data column, return directly.
H
Haojun Liao 已提交
1030
  if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
H
Haojun Liao 已提交
1031
    ASSERT(pInputCol->hasNull == true);
H
Haojun Liao 已提交
1032
    return 0;
H
Haojun Liao 已提交
1033 1034
  }

1035
  SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
1036

1037 1038
  TSKEY startKey = getRowPTs(pInput->pPTS, 0);
  TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
1039

1040
  int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055

  if (blockDataOrder == TSDB_ORDER_ASC) {
    // filter according to current result firstly
    if (pResInfo->numOfRes > 0) {
      TSKEY ts = *(TSKEY*)(buf + bytes);
      if (ts < startKey) {
        return TSDB_CODE_SUCCESS;
      }
    }

    for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
        continue;
      }

1056 1057
      numOfElems++;

1058
      char* data = colDataGetData(pInputCol, i);
1059
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1060

1061
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
1062 1063
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
1064
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
1065 1066

        pResInfo->numOfRes = 1;
1067
        break;
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077
      }
    }
  } else {
    // in case of descending order time stamp serial, which usually happens as the results of the nest query,
    // all data needs to be check.
    if (pResInfo->numOfRes > 0) {
      TSKEY ts = *(TSKEY*)(buf + bytes);
      if (ts < endKey) {
        return TSDB_CODE_SUCCESS;
      }
H
Haojun Liao 已提交
1078 1079
    }

1080 1081 1082 1083 1084
    for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
        continue;
      }

1085 1086
      numOfElems++;

1087
      char* data = colDataGetData(pInputCol, i);
1088
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1089

1090
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
1091 1092
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
1093
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
1094
        pResInfo->numOfRes = 1;
1095
        break;
1096 1097
      }
    }
H
Haojun Liao 已提交
1098 1099 1100
  }

  SET_VAL(pResInfo, numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
1101
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1102 1103
}

1104
int32_t lastFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
1105 1106
  int32_t numOfElems = 0;

1107 1108
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1109 1110

  SInputColumnInfoData* pInput = &pCtx->input;
1111
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
1112

1113 1114
  int32_t bytes = pInputCol->info.bytes;

H
Haojun Liao 已提交
1115
  // All null data column, return directly.
1116
  if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
H
Haojun Liao 已提交
1117
    ASSERT(pInputCol->hasNull == true);
H
Haojun Liao 已提交
1118
    return 0;
H
Haojun Liao 已提交
1119 1120
  }

1121
  SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
1122 1123 1124 1125

  TSKEY startKey = getRowPTs(pInput->pPTS, 0);
  TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);

1126
  int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
1127 1128

  if (blockDataOrder == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
1129
    for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
1130
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
H
Haojun Liao 已提交
1131 1132 1133 1134
        continue;
      }

      numOfElems++;
1135 1136 1137

      char* data = colDataGetData(pInputCol, i);
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1138
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
1139 1140 1141 1142 1143
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
        pResInfo->numOfRes = 1;
      }
H
Haojun Liao 已提交
1144 1145
      break;
    }
1146
  } else {  // descending order
H
Haojun Liao 已提交
1147
    for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
1148
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
H
Haojun Liao 已提交
1149 1150 1151
        continue;
      }

1152
      numOfElems++;
H
Haojun Liao 已提交
1153

1154 1155
      char* data = colDataGetData(pInputCol, i);
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1156
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
1157 1158 1159
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
        pResInfo->numOfRes = 1;
1160
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
H
Haojun Liao 已提交
1161 1162 1163 1164 1165 1166
      }
      break;
    }
  }

  SET_VAL(pResInfo, numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
1167
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1168
}
H
Haojun Liao 已提交
1169

H
Haojun Liao 已提交
1170 1171 1172 1173 1174
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SDiffInfo);
  return true;
}

1175
bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
H
Haojun Liao 已提交
1176 1177 1178 1179 1180
  if (!functionSetup(pCtx, pResInfo)) {
    return false;
  }

  SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
1181
  pDiffInfo->hasPrev = false;
H
Haojun Liao 已提交
1182
  pDiffInfo->prev.i64 = 0;
1183
  pDiffInfo->ignoreNegative = false;  // TODO set correct param
H
Haojun Liao 已提交
1184 1185
  pDiffInfo->includeNull = false;
  pDiffInfo->firstOutput = false;
H
Haojun Liao 已提交
1186 1187 1188
  return true;
}

1189 1190 1191
int32_t diffFunction(SqlFunctionCtx* pCtx) {
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  SDiffInfo*           pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1192 1193

  SInputColumnInfoData* pInput = &pCtx->input;
1194
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
1195

1196
  bool    isFirstBlock = (pDiffInfo->hasPrev == false);
H
Haojun Liao 已提交
1197 1198 1199
  int32_t numOfElems = 0;

  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
1200
  //  int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
H
Haojun Liao 已提交
1201

H
Haojun Liao 已提交
1202
  SColumnInfoData* pTsOutput = pCtx->pTsOutput;
1203
  TSKEY*           tsList = (int64_t*)pInput->pPTS->pData;
H
Haojun Liao 已提交
1204

H
Haojun Liao 已提交
1205
  int32_t startOffset = pCtx->offset;
H
Haojun Liao 已提交
1206 1207
  switch (pInputCol->info.type) {
    case TSDB_DATA_TYPE_INT: {
1208
      SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
H
Haojun Liao 已提交
1209
      for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
1210
        int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
H
Haojun Liao 已提交
1211
        if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
H
Haojun Liao 已提交
1212 1213 1214 1215 1216 1217 1218 1219
          if (pDiffInfo->includeNull) {
            colDataSetNull_f(pOutput->nullbitmap, pos);
            if (tsList != NULL) {
              colDataAppendInt64(pTsOutput, pos, &tsList[i]);
            }

            numOfElems += 1;
          }
H
Haojun Liao 已提交
1220 1221 1222
          continue;
        }

1223
        int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
H
Haojun Liao 已提交
1224 1225 1226 1227 1228 1229
        if (pDiffInfo->hasPrev) {
          int32_t delta = (int32_t)(v - pDiffInfo->prev.i64);  // direct previous may be null
          if (delta < 0 && pDiffInfo->ignoreNegative) {
            colDataSetNull_f(pOutput->nullbitmap, pos);
          } else {
            colDataAppendInt32(pOutput, pos, &delta);
H
Haojun Liao 已提交
1230 1231
          }

1232
          if (pTsOutput != NULL) {
H
Haojun Liao 已提交
1233 1234
            colDataAppendInt64(pTsOutput, pos, &tsList[i]);
          }
H
Haojun Liao 已提交
1235 1236 1237
        }

        pDiffInfo->prev.i64 = v;
1238
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1239 1240 1241 1242
        numOfElems++;
      }
      break;
    }
H
Haojun Liao 已提交
1243

H
Haojun Liao 已提交
1244
    case TSDB_DATA_TYPE_BIGINT: {
1245
      SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
H
Haojun Liao 已提交
1246 1247 1248 1249 1250 1251
      for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
        if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
          continue;
        }

        int32_t v = 0;
H
Haojun Liao 已提交
1252
        if (pDiffInfo->hasPrev) {
1253
          v = *(int64_t*)colDataGetData(pInputCol, i);
H
Haojun Liao 已提交
1254 1255 1256 1257 1258
          int64_t delta = (int64_t)(v - pDiffInfo->prev.i64);  // direct previous may be null
          if (pDiffInfo->ignoreNegative) {
            continue;
          }

1259 1260 1261 1262 1263
          //          *(pOutput++) = delta;
          //          *pTimestamp  = (tsList != NULL)? tsList[i]:0;
          //
          //          pOutput    += 1;
          //          pTimestamp += 1;
H
Haojun Liao 已提交
1264 1265 1266
        }

        pDiffInfo->prev.i64 = v;
H
Haojun Liao 已提交
1267
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283
        numOfElems++;
      }
      break;
    }
#if 0
    case TSDB_DATA_TYPE_DOUBLE: {
      double *pData = (double *)data;
      double *pOutput = (double *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }
H
Haojun Liao 已提交
1284

H
Haojun Liao 已提交
1285
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1286 1287 1288 1289 1290 1291 1292
          SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->d64Prev = pData[i];
H
Haojun Liao 已提交
1293
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
        numOfElems++;
      }
      break;
    }
    case TSDB_DATA_TYPE_FLOAT: {
      float *pData = (float *)data;
      float *pOutput = (float *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1310
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1311 1312 1313 1314 1315 1316 1317
          *pOutput = (float)(pData[i] - pDiffInfo->d64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->d64Prev = pData[i];
H
Haojun Liao 已提交
1318
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334
        numOfElems++;
      }
      break;
    }
    case TSDB_DATA_TYPE_SMALLINT: {
      int16_t *pData = (int16_t *)data;
      int16_t *pOutput = (int16_t *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1335
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1336 1337 1338 1339 1340 1341 1342
          *pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->i64Prev = pData[i];
H
Haojun Liao 已提交
1343
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360
        numOfElems++;
      }
      break;
    }

    case TSDB_DATA_TYPE_TINYINT: {
      int8_t *pData = (int8_t *)data;
      int8_t *pOutput = (int8_t *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1361
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1362 1363 1364 1365 1366 1367 1368
          *pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->i64Prev = pData[i];
H
Haojun Liao 已提交
1369
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1370 1371 1372 1373 1374 1375 1376
        numOfElems++;
      }
      break;
    }
#endif
    default:
      break;
1377
      //      qError("error input type");
H
Haojun Liao 已提交
1378 1379 1380
  }

  // initial value is not set yet
H
Haojun Liao 已提交
1381
  if (!pDiffInfo->hasPrev || numOfElems <= 0) {
H
Haojun Liao 已提交
1382 1383 1384 1385 1386
    /*
     * 1. current block and blocks before are full of null
     * 2. current block may be null value
     */
    assert(pCtx->hasNull);
wmmhello's avatar
wmmhello 已提交
1387
    return 0;
H
Haojun Liao 已提交
1388
  } else {
1389 1390 1391 1392 1393 1394
    //    for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
    //      SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
    //      if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
    //        aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
    //      }
    //    }
H
Haojun Liao 已提交
1395 1396 1397 1398

    int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
    return forwardStep;
  }
H
Haojun Liao 已提交
1399
}
H
Haojun Liao 已提交
1400

1401
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
1402
  SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
1403
  pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
1404 1405 1406
  return true;
}

1407 1408 1409 1410
static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  STopBotRes*          pRes = GET_ROWCELL_INTERBUF(pResInfo);
  pRes->pItems = (STopBotResItem*)((char*)pRes + sizeof(STopBotRes));
1411 1412

  return pRes;
1413 1414
}

1415 1416
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
                            uint64_t uid, SResultRowEntryInfo* pEntryInfo);
1417

1418 1419 1420
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);

1421 1422 1423
int32_t topFunction(SqlFunctionCtx* pCtx) {
  int32_t              numOfElems = 0;
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
1424

1425 1426 1427
  //  if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
  //    buildTopBotStruct(pRes, pCtx);
  //  }
1428 1429

  SInputColumnInfoData* pInput = &pCtx->input;
1430
  SColumnInfoData*      pCol = pInput->pData[0];
1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443

  int32_t type = pInput->pData[0]->info.type;

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

  for (int32_t i = start; i < numOfRows + start; ++i) {
    if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
      continue;
    }
    numOfElems++;

    char* data = colDataGetData(pCol, i);
1444
    doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo);
1445 1446
  }

1447
  return TSDB_CODE_SUCCESS;
1448 1449
}

1450 1451
static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) {
  uint16_t type = *(uint16_t*)param;
1452

1453 1454
  STopBotResItem* val1 = (STopBotResItem*)p1;
  STopBotResItem* val2 = (STopBotResItem*)p2;
1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476

  if (IS_SIGNED_NUMERIC_TYPE(type)) {
    if (val1->v.i == val2->v.i) {
      return 0;
    }

    return (val1->v.i > val2->v.i) ? 1 : -1;
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
    if (val1->v.u == val2->v.u) {
      return 0;
    }

    return (val1->v.u > val2->v.u) ? 1 : -1;
  }

  if (val1->v.d == val2->v.d) {
    return 0;
  }

  return (val1->v.d > val2->v.d) ? 1 : -1;
}

1477 1478
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
                     uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
1479 1480
  STopBotRes* pRes = getTopBotOutputInfo(pCtx);
  int32_t     maxSize = pCtx->param[1].param.i;
1481

1482 1483 1484
  SVariant val = {0};
  taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);

1485
  STopBotResItem* pItems = pRes->pItems;
1486 1487 1488
  assert(pItems != NULL);

  // not full yet
1489 1490
  if (pEntryInfo->numOfRes < maxSize) {
    STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
1491
    pItem->v = val;
1492
    pItem->uid = uid;
1493

1494 1495
    // save the data of this tuple
    saveTupleData(pCtx, rowIndex, pSrcBlock, pItem);
1496 1497 1498

    // allocate the buffer and keep the data of this row into the new allocated buffer
    pEntryInfo->numOfRes++;
1499 1500 1501
    taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
                 false);
  } else {  // replace the minimum value in the result
1502
    if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
1503
        (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
1504
      // replace the old data and the coresponding tuple data
1505
      STopBotResItem* pItem = &pItems[0];
1506
      pItem->v = val;
1507
      pItem->uid = uid;
1508 1509 1510

      // save the data of this tuple by over writing the old data
      copyTupleData(pCtx, rowIndex, pSrcBlock, pItem);
1511

1512 1513
      taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
                     topBotResComparFn, NULL, false);
1514
    }
1515 1516
  }
}
1517

1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
  SFilePage* pPage = NULL;

  int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool);

  if (pCtx->curBufPage == -1) {
    pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
    pPage->num = sizeof(SFilePage);
  } else {
    pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage);
    if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) {
      pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
      pPage->num = sizeof(SFilePage);
    }
  }

  pItem->tuplePos.pageId = pCtx->curBufPage;

  // keep the current row data, extract method
  int32_t offset = 0;
  bool*   nullList = (bool*)((char*)pPage + pPage->num);
  char*   pStart = (char*)(nullList + sizeof(bool) * pSrcBlock->info.numOfCols);
  for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
    bool             isNull = colDataIsNull_s(pCol, rowIndex);
    if (isNull) {
      nullList[i] = true;
      continue;
    }

    char* p = colDataGetData(pCol, rowIndex);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      memcpy(pStart + offset, p, varDataTLen(p));
    } else {
      memcpy(pStart + offset, p, pCol->info.bytes);
    }

    offset += pCol->info.bytes;
  }

  pItem->tuplePos.offset = pPage->num;
  pPage->num += completeRowSize;

  setBufPageDirty(pPage, true);
  releaseBufPage(pCtx->pBuf, pPage);
}

void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
  SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId);

  bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset);
  char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));

  int32_t offset = 0;
1572
  for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592
    SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
    if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
      continue;
    }

    char* p = colDataGetData(pCol, rowIndex);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      memcpy(pStart + offset, p, varDataTLen(p));
    } else {
      memcpy(pStart + offset, p, pCol->info.bytes);
    }

    offset += pCol->info.bytes;
  }

  setBufPageDirty(pPage, true);
  releaseBufPage(pCtx->pBuf, pPage);
}

int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
1593 1594
  SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
  STopBotRes*          pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
1595 1596
  pEntryInfo->complete = true;

1597 1598
  int32_t          type = pCtx->input.pData[0]->info.type;
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
1599 1600 1601 1602
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

  // todo assign the tag value and the corresponding row data
  int32_t currentRow = pBlock->info.rows;
1603
  switch (type) {
1604 1605 1606
    case TSDB_DATA_TYPE_INT: {
      for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
        STopBotResItem* pItem = &pRes->pItems[i];
1607
        colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i);
1608 1609 1610

        int32_t pageId = pItem->tuplePos.pageId;
        int32_t offset = pItem->tuplePos.offset;
1611 1612
        if (pItem->tuplePos.pageId != -1) {
          SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
H
Haojun Liao 已提交
1613

1614 1615 1616 1617 1618 1619 1620
          bool* nullList = (bool*)((char*)pPage + offset);
          char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));

          // todo set the offset value to optimize the performance.
          for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
            SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];

1621 1622 1623
            SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
            int32_t      srcSlotId = pFuncParam->pCol->slotId;
            int32_t      dstSlotId = pCtx->pExpr->base.resSchema.slotId;
1624 1625

            int32_t ps = 0;
1626
            for (int32_t k = 0; k < srcSlotId; ++k) {
1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637
              SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
              ps += pSrcCol->info.bytes;
            }

            SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
            if (nullList[srcSlotId]) {
              colDataAppendNULL(pDstCol, currentRow);
            } else {
              colDataAppend(pDstCol, currentRow, (pStart + ps), false);
            }
          }
1638
        }
1639 1640

        currentRow += 1;
1641
      }
H
Haojun Liao 已提交
1642

1643 1644 1645 1646 1647
      break;
    }
  }

  return pEntryInfo->numOfRes;
1648
}
G
Ganlin Zhao 已提交
1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746

bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SSpreadInfo);
  return true;
}

bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
  SET_DOUBLE_VAL(&pInfo->min, DBL_MAX);
  SET_DOUBLE_VAL(&pInfo->max, -DBL_MAX);
  pInfo->hasResult = false;
  return true;
}

int32_t spreadFunction(SqlFunctionCtx *pCtx) {
  int32_t numOfElems = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
  int32_t type = pInput->pData[0]->info.type;

  SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

  if (pInput->colDataAggIsSet) {
    numOfElems = pInput->numOfRows - pAgg->numOfNull;
    if (numOfElems == 0) {
      goto _spread_over;
    }
    double tmin = 0.0, tmax = 0.0;
    if (IS_SIGNED_NUMERIC_TYPE(type)) {
      tmin = (double)GET_INT64_VAL(&pAgg->min);
      tmax = (double)GET_INT64_VAL(&pAgg->max);
    } else if (IS_FLOAT_TYPE(type)) {
      tmin = GET_DOUBLE_VAL(&pAgg->min);
      tmax = GET_DOUBLE_VAL(&pAgg->max);
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
      tmin = (double)GET_UINT64_VAL(&pAgg->min);
      tmax = (double)GET_UINT64_VAL(&pAgg->max);
    }

    if (GET_DOUBLE_VAL(&pInfo->min) > tmin) {
      SET_DOUBLE_VAL(&pInfo->min, tmin);
    }

    if (GET_DOUBLE_VAL(&pInfo->max) < tmax) {
      SET_DOUBLE_VAL(&pInfo->max, tmax);
    }

  } else {  // computing based on the true data block
    SColumnInfoData* pCol = pInput->pData[0];

    int32_t start     = pInput->startRowIndex;
    int32_t numOfRows = pInput->numOfRows;

    // check the valid data one by one
    for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
      if (colDataIsNull_f(pCol->nullbitmap, i)) {
        continue;
      }

      char *data = colDataGetData(pCol, i);

      double v = 0;
      GET_TYPED_DATA(v, double, type, data);
      if (v < GET_DOUBLE_VAL(&pInfo->min)) {
        SET_DOUBLE_VAL(&pInfo->min, v);
      }

      if (v > GET_DOUBLE_VAL(&pInfo->max)) {
        SET_DOUBLE_VAL(&pInfo->max, v);
      }

      numOfElems += 1;
    }
  }

_spread_over:
  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
  if (numOfElems > 0) {
    pInfo->hasResult = true;
  }

  return TSDB_CODE_SUCCESS;
}

int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
  SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  if (pInfo->hasResult == true) {
    SET_DOUBLE_VAL(&pInfo->result, pInfo->max - pInfo->min);
  }
  return functionFinalize(pCtx, pBlock);
}