builtinsimpl.c 57.5 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "builtinsimpl.h"
17
#include "function.h"
18
#include "querynodes.h"
H
Haojun Liao 已提交
19 20
#include "taggfunction.h"
#include "tdatablock.h"
21
#include "tpercentile.h"
H
Haojun Liao 已提交
22

G
Ganlin Zhao 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36
typedef struct SSumRes {
  union {
    int64_t  isum;
    uint64_t usum;
    double   dsum;
  };
} SSumRes;

typedef struct SAvgRes {
  double  result;
  SSumRes sum;
  int64_t count;
} SAvgRes;

37 38
typedef struct STopBotResItem {
  SVariant v;
39
  uint64_t uid;  // it is a table uid, used to extract tag data during building of the final result for the tag data
40
  struct {
41 42 43
    int32_t pageId;
    int32_t offset;
  } tuplePos;  // tuple data of this chosen row
44 45
} STopBotResItem;

G
Ganlin Zhao 已提交
46
typedef struct STopBotRes {
47
  STopBotResItem* pItems;
G
Ganlin Zhao 已提交
48 49 50 51 52
} STopBotRes;

typedef struct SStddevRes {
  double  result;
  int64_t count;
53 54 55 56 57 58 59 60
  union {
    double  quadraticDSum;
    int64_t quadraticISum;
  };
  union {
    double  dsum;
    int64_t isum;
  };
G
Ganlin Zhao 已提交
61 62 63 64
} SStddevRes;

typedef struct SPercentileInfo {
  double      result;
65
  tMemBucket* pMemBucket;
G
Ganlin Zhao 已提交
66 67 68 69 70 71 72
  int32_t     stage;
  double      minval;
  double      maxval;
  int64_t     numOfElems;
} SPercentileInfo;

typedef struct SDiffInfo {
73 74 75 76 77 78 79 80
  bool hasPrev;
  bool includeNull;
  bool ignoreNegative;
  bool firstOutput;
  union {
    int64_t i64;
    double  d64;
  } prev;
81 82

  int64_t prevTs;
G
Ganlin Zhao 已提交
83 84
} SDiffInfo;

G
Ganlin Zhao 已提交
85 86 87 88 89 90 91
typedef struct SSpreadInfo {
  double result;
  bool   hasResult;
  double min;
  double max;
} SSpreadInfo;

92 93 94 95 96 97
#define SET_VAL(_info, numOfElem, res) \
  do {                                 \
    if ((numOfElem) <= 0) {            \
      break;                           \
    }                                  \
    (_info)->numOfRes = (res);         \
H
Haojun Liao 已提交
98 99
  } while (0)

G
Ganlin Zhao 已提交
100 101 102 103 104 105
#define GET_TS_LIST(x)    ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])

#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx)                      \
  do {                                                             \
    for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
106
      SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i];      \
G
Ganlin Zhao 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
      __ctx->fpSet.process(__ctx);                                 \
    }                                                              \
  } while (0);

#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
  do {                                                \
    if (((left) < (right)) ^ (sign)) {                \
      (left) = (right);                               \
      DO_UPDATE_SUBSID_RES(ctx, _ts);                 \
      (num) += 1;                                     \
    }                                                 \
  } while (0)

#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num)        \
  do {                                                                   \
122
    _t* d = (_t*)((_col)->pData);                                        \
G
Ganlin Zhao 已提交
123 124 125 126 127 128 129 130 131
    for (int32_t i = (_start); i < (_nrow) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      }                                                                  \
      TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0;       \
      UPDATE_DATA(ctx, val, d[i], num, sign, ts);                        \
    }                                                                    \
  } while (0)

132
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
H
Haojun Liao 已提交
133 134 135 136 137 138 139 140 141 142 143 144
  if (pResultInfo->initialized) {
    return false;
  }

  if (pCtx->pOutput != NULL) {
    memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes);
  }

  initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize);
  return true;
}

145
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
146
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
147
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
148

149
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
150 151
  pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
  /*cleanupResultRowEntry(pResInfo);*/
152 153 154 155 156

  char* in = GET_ROWCELL_INTERBUF(pResInfo);
  colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);

  return pResInfo->numOfRes;
H
Haojun Liao 已提交
157 158
}

159
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
160
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
161 162 163
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
164
  pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
165 166 167 168 169 170 171 172
  cleanupResultRowEntry(pResInfo);

  char* in = finalResult;
  colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);

  return pResInfo->numOfRes;
}

173 174 175
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
  SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
  if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
176
    return FUNC_DATA_REQUIRED_NOT_LOAD;
177
  }
178
  return FUNC_DATA_REQUIRED_STATIS_LOAD;
179
}
H
Haojun Liao 已提交
180 181 182 183 184 185 186 187 188 189

bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

/*
 * count function does need the finalize, if data is missing, the default value, which is 0, is used
 * count function does not use the pCtx->interResBuf to keep the intermediate buffer
 */
190
int32_t countFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
191 192 193
  int32_t numOfElem = 0;

  /*
H
Haojun Liao 已提交
194 195 196
   * 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true;
   * 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true;
   * 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false;
H
Haojun Liao 已提交
197 198
   */
  SInputColumnInfoData* pInput = &pCtx->input;
199
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
200 201 202 203 204 205 206 207 208 209 210 211
  if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) {
    numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
    ASSERT(numOfElem >= 0);
  } else {
    if (pInputCol->hasNull) {
      for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
        if (colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
          continue;
        }
        numOfElem += 1;
      }
    } else {
212 213
      // when counting on the primary time stamp column and no statistics data is presented, use the size value
      // directly.
H
Haojun Liao 已提交
214 215 216 217 218
      numOfElem = pInput->numOfRows;
    }
  }

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
219 220
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
  *((int64_t*)buf) += numOfElem;
H
Haojun Liao 已提交
221 222

  SET_VAL(pResInfo, numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
223
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
224 225 226 227
}

#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem)             \
  do {                                                                   \
228
    _t* d = (_t*)(_col->pData);                                          \
H
Haojun Liao 已提交
229 230 231 232 233 234 235 236 237
    for (int32_t i = (_start); i < (_rows) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      };                                                                 \
      (_res) += (d)[i];                                                  \
      (numOfElem)++;                                                     \
    }                                                                    \
  } while (0)

238
int32_t sumFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
239 240 241 242
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
243 244
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
  int32_t               type = pInput->pData[0]->info.type;
H
Haojun Liao 已提交
245

246
  SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
G
Ganlin Zhao 已提交
247

H
Haojun Liao 已提交
248 249 250 251 252
  if (pInput->colDataAggIsSet) {
    numOfElem = pInput->numOfRows - pAgg->numOfNull;
    ASSERT(numOfElem >= 0);

    if (IS_SIGNED_NUMERIC_TYPE(type)) {
253
      pSumRes->isum += pAgg->sum;
H
Haojun Liao 已提交
254
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
255
      pSumRes->usum += pAgg->sum;
H
Haojun Liao 已提交
256
    } else if (IS_FLOAT_TYPE(type)) {
257
      pSumRes->dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
H
Haojun Liao 已提交
258 259 260 261
    }
  } else {  // computing based on the true data block
    SColumnInfoData* pCol = pInput->pData[0];

262
    int32_t start = pInput->startRowIndex;
H
Haojun Liao 已提交
263 264
    int32_t numOfRows = pInput->numOfRows;

265 266
    if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
      if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
267 268 269 270 271 272 273
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int8_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_SMALLINT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int16_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_INT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int32_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_BIGINT) {
        LIST_ADD_N(pSumRes->isum, pCol, start, numOfRows, int64_t, numOfElem);
H
Haojun Liao 已提交
274
      }
275 276 277 278 279 280 281 282 283
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
      if (type == TSDB_DATA_TYPE_UTINYINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint8_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_USMALLINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint16_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_UINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint32_t, numOfElem);
      } else if (type == TSDB_DATA_TYPE_UBIGINT) {
        LIST_ADD_N(pSumRes->usum, pCol, start, numOfRows, uint64_t, numOfElem);
H
Haojun Liao 已提交
284
      }
285 286 287 288
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
      LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, double, numOfElem);
    } else if (type == TSDB_DATA_TYPE_FLOAT) {
      LIST_ADD_N(pSumRes->dsum, pCol, start, numOfRows, float, numOfElem);
H
Haojun Liao 已提交
289 290 291 292 293
    }
  }

  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
294
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
295 296
}

H
Haojun Liao 已提交
297
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
H
Haojun Liao 已提交
298 299 300 301
  pEnv->calcMemSize = sizeof(SSumRes);
  return true;
}

G
Ganlin Zhao 已提交
302 303 304 305 306
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(double);
  return true;
}

307
bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
G
Ganlin Zhao 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
  memset(pRes, 0, sizeof(SAvgRes));
  return true;
}

int32_t avgFunction(SqlFunctionCtx* pCtx) {
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t               type = pInput->pData[0]->info.type;

  SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

  // computing based on the true data block
  SColumnInfoData* pCol = pInput->pData[0];

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

  switch (type) {
    case TSDB_DATA_TYPE_TINYINT: {
334 335 336 337
      int8_t* plist = (int8_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
G
Ganlin Zhao 已提交
338 339
        }

340 341 342
        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
G
Ganlin Zhao 已提交
343 344
      }

345 346 347 348
      break;
    }

    case TSDB_DATA_TYPE_SMALLINT: {
G
Ganlin Zhao 已提交
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
      int16_t* plist = (int16_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      int32_t* plist = (int32_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }

      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      int64_t* plist = (int64_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.isum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      float* plist = (float*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.dsum += plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_DOUBLE: {
      double* plist = (double*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pAvgRes->count += 1;
        pAvgRes->sum.dsum += plist[i];
      }
      break;
    }

    default:
      break;
  }

  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
428
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
G
Ganlin Zhao 已提交
429
  SInputColumnInfoData* pInput = &pCtx->input;
430 431
  int32_t               type = pInput->pData[0]->info.type;
  SAvgRes*              pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
G
Ganlin Zhao 已提交
432
  if (IS_INTEGER_TYPE(type)) {
433
    pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
G
Ganlin Zhao 已提交
434
  } else {
435
    pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count);
G
Ganlin Zhao 已提交
436
  }
437

H
Haojun Liao 已提交
438
  return functionFinalize(pCtx, pBlock);
G
Ganlin Zhao 已提交
439 440
}

441
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
442 443 444
  return FUNC_DATA_REQUIRED_STATIS_LOAD;
}

445
bool maxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
446 447 448 449 450
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
451
  switch (pCtx->resDataInfo.type) {
452
    case TSDB_DATA_TYPE_INT:
453
      *((int32_t*)buf) = INT32_MIN;
454 455
      break;
    case TSDB_DATA_TYPE_UINT:
456
      *((uint32_t*)buf) = 0;
457 458
      break;
    case TSDB_DATA_TYPE_FLOAT:
459
      *((float*)buf) = -FLT_MAX;
460 461
      break;
    case TSDB_DATA_TYPE_DOUBLE:
462
      SET_DOUBLE_VAL(((double*)buf), -DBL_MAX);
463 464
      break;
    case TSDB_DATA_TYPE_BIGINT:
465
      *((int64_t*)buf) = INT64_MIN;
466 467
      break;
    case TSDB_DATA_TYPE_UBIGINT:
468
      *((uint64_t*)buf) = 0;
469 470
      break;
    case TSDB_DATA_TYPE_SMALLINT:
471
      *((int16_t*)buf) = INT16_MIN;
472 473
      break;
    case TSDB_DATA_TYPE_USMALLINT:
474
      *((uint16_t*)buf) = 0;
475 476
      break;
    case TSDB_DATA_TYPE_TINYINT:
477
      *((int8_t*)buf) = INT8_MIN;
478 479
      break;
    case TSDB_DATA_TYPE_UTINYINT:
480
      *((uint8_t*)buf) = 0;
481
      break;
482 483 484
    case TSDB_DATA_TYPE_BOOL:
      *((int8_t*)buf) = 0;
      break;
485 486 487 488 489 490
    default:
      assert(0);
  }
  return true;
}

491
bool minFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
492 493 494 495 496
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;  // not initialized since it has been initialized
  }

  char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
497
  switch (pCtx->resDataInfo.type) {
498
    case TSDB_DATA_TYPE_TINYINT:
499
      *((int8_t*)buf) = INT8_MAX;
500 501
      break;
    case TSDB_DATA_TYPE_UTINYINT:
502
      *(uint8_t*)buf = UINT8_MAX;
503 504
      break;
    case TSDB_DATA_TYPE_SMALLINT:
505
      *((int16_t*)buf) = INT16_MAX;
506 507
      break;
    case TSDB_DATA_TYPE_USMALLINT:
508
      *((uint16_t*)buf) = UINT16_MAX;
509 510
      break;
    case TSDB_DATA_TYPE_INT:
511
      *((int32_t*)buf) = INT32_MAX;
512 513
      break;
    case TSDB_DATA_TYPE_UINT:
514
      *((uint32_t*)buf) = UINT32_MAX;
515 516
      break;
    case TSDB_DATA_TYPE_BIGINT:
517
      *((int64_t*)buf) = INT64_MAX;
518 519
      break;
    case TSDB_DATA_TYPE_UBIGINT:
520
      *((uint64_t*)buf) = UINT64_MAX;
521 522
      break;
    case TSDB_DATA_TYPE_FLOAT:
523
      *((float*)buf) = FLT_MAX;
524 525
      break;
    case TSDB_DATA_TYPE_DOUBLE:
526
      SET_DOUBLE_VAL(((double*)buf), DBL_MAX);
527
      break;
528 529 530
    case TSDB_DATA_TYPE_BOOL:
      *((int8_t*)buf) = 1;
      break;
531 532 533 534 535 536 537
    default:
      assert(0);
  }

  return true;
}

H
Haojun Liao 已提交
538
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
539 540 541 542 543 544 545 546 547 548
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

#define GET_TS_LIST(x)    ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])

#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx)                      \
  do {                                                             \
    for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
549
      SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i];      \
H
Haojun Liao 已提交
550
      __ctx->fpSet.process(__ctx);                                 \
551 552 553
    }                                                              \
  } while (0);

554 555
#define DO_UPDATE_SUBSID_RES(ctx, ts)                          \
  do {                                                         \
556
    for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
557 558 559 560 561 562 563
      SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i];    \
      if (__ctx->functionId == FUNCTION_TS_DUMMY) {            \
        __ctx->tag.i = (ts);                                   \
        __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;              \
      }                                                        \
      __ctx->fpSet.process(__ctx);                             \
    }                                                          \
564 565 566
  } while (0)

#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
H
Haojun Liao 已提交
567 568 569 570 571 572
  do {                                                \
    if (((left) < (right)) ^ (sign)) {                \
      (left) = (right);                               \
      DO_UPDATE_SUBSID_RES(ctx, _ts);                 \
      (num) += 1;                                     \
    }                                                 \
573 574
  } while (0)

H
Haojun Liao 已提交
575
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num)        \
576
  do {                                                                   \
577
    _t* d = (_t*)((_col)->pData);                                        \
578 579 580 581 582 583 584 585 586
    for (int32_t i = (_start); i < (_nrow) + (_start); ++i) {            \
      if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
        continue;                                                        \
      }                                                                  \
      TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0;       \
      UPDATE_DATA(ctx, val, d[i], num, sign, ts);                        \
    }                                                                    \
  } while (0)

587
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
588 589 590
  int32_t numOfElems = 0;

  SInputColumnInfoData* pInput = &pCtx->input;
591
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
592 593

  SColumnInfoData* pCol = pInput->pData[0];
594
  int32_t          type = pCol->info.type;
595 596

  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
597
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
598 599 600 601 602 603 604 605 606 607 608 609 610 611

  // data in current data block are qualified to the query
  if (pInput->colDataAggIsSet) {
    numOfElems = pInput->numOfRows - pAgg->numOfNull;
    ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0);

    if (numOfElems == 0) {
      return numOfElems;
    }

    void*   tval = NULL;
    int16_t index = 0;

    if (isMinFunc) {
612
      tval = &pInput->pColumnDataAgg[0]->min;
613 614
      index = pInput->pColumnDataAgg[0]->minIndex;
    } else {
615
      tval = &pInput->pColumnDataAgg[0]->max;
616 617 618
      index = pInput->pColumnDataAgg[0]->maxIndex;
    }

619
    // the index is the original position, not the relative position
620
    TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL;
621 622

    if (IS_SIGNED_NUMERIC_TYPE(type)) {
623 624
      int64_t prev = 0;
      GET_TYPED_DATA(prev, int64_t, type, buf);
625

626 627
      int64_t val = GET_INT64_VAL(tval);
      if ((prev < val) ^ isMinFunc) {
628
        *(int64_t*)buf = val;
629 630
        for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
          SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
631 632 633 634 635 636 637 638
          if (__ctx->functionId == FUNCTION_TS_DUMMY) {  // TODO refactor
            __ctx->tag.i = key;
            __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
          }

          __ctx->fpSet.process(__ctx);
        }
      }
639
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
640 641 642
      uint64_t prev = 0;
      GET_TYPED_DATA(prev, uint64_t, type, buf);

643
      uint64_t val = GET_UINT64_VAL(tval);
H
Haojun Liao 已提交
644
      if ((prev < val) ^ isMinFunc) {
645
        *(uint64_t*)buf = val;
646 647
        for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
          SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
H
Haojun Liao 已提交
648 649 650 651
          if (__ctx->functionId == FUNCTION_TS_DUMMY) {  // TODO refactor
            __ctx->tag.i = key;
            __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
          }
652

H
Haojun Liao 已提交
653 654 655 656
          __ctx->fpSet.process(__ctx);
        }
      }
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
657 658
      double val = GET_DOUBLE_VAL(tval);
      UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key);
659
    } else if (type == TSDB_DATA_TYPE_FLOAT) {
660
      double val = GET_DOUBLE_VAL(tval);
661
      UPDATE_DATA(pCtx, *(float*)buf, val, numOfElems, isMinFunc, key);
662 663 664 665 666 667 668 669
    }

    return numOfElems;
  }

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

670 671
  if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
    if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
672 673
      LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
    } else if (type == TSDB_DATA_TYPE_SMALLINT) {
674
      LOOPCHECK_N(*(int16_t*)buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
675
    } else if (type == TSDB_DATA_TYPE_INT) {
676 677
      int32_t* pData = (int32_t*)pCol->pData;
      int32_t* val = (int32_t*)buf;
678

H
Haojun Liao 已提交
679
      for (int32_t i = start; i < start + numOfRows; ++i) {
680 681 682 683 684 685
        if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        if ((*val < pData[i]) ^ isMinFunc) {
          *val = pData[i];
686
          TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0;
687 688 689 690 691 692 693 694 695
          DO_UPDATE_SUBSID_RES(pCtx, ts);
        }

        numOfElems += 1;
      }

#if defined(_DEBUG_VIEW)
      qDebug("max value updated:%d", *retVal);
#endif
696
    } else if (type == TSDB_DATA_TYPE_BIGINT) {
697
      LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
698
    }
699 700
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
    if (type == TSDB_DATA_TYPE_UTINYINT) {
701
      LOOPCHECK_N(*(uint8_t*)buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
702
    } else if (type == TSDB_DATA_TYPE_USMALLINT) {
703
      LOOPCHECK_N(*(uint16_t*)buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
704
    } else if (type == TSDB_DATA_TYPE_UINT) {
705
      LOOPCHECK_N(*(uint32_t*)buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
706
    } else if (type == TSDB_DATA_TYPE_UBIGINT) {
707
      LOOPCHECK_N(*(uint64_t*)buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
708
    }
709
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
710
    LOOPCHECK_N(*(double*)buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
711
  } else if (type == TSDB_DATA_TYPE_FLOAT) {
712
    LOOPCHECK_N(*(float*)buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
713 714 715
  }

  return numOfElems;
H
Haojun Liao 已提交
716
}
717

718
int32_t minFunction(SqlFunctionCtx* pCtx) {
719 720
  int32_t numOfElems = doMinMaxHelper(pCtx, 1);
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
721
  return TSDB_CODE_SUCCESS;
722 723
}

724
int32_t maxFunction(SqlFunctionCtx* pCtx) {
725 726
  int32_t numOfElems = doMinMaxHelper(pCtx, 0);
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
727
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
728 729 730 731 732 733 734
}

bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SStddevRes);
  return true;
}

735
bool stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
H
Haojun Liao 已提交
736 737 738 739 740 741 742 743 744
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SStddevRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
  memset(pRes, 0, sizeof(SStddevRes));
  return true;
}

H
Haojun Liao 已提交
745
int32_t stddevFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
746 747 748 749
  int32_t numOfElem = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
H
Haojun Liao 已提交
750
  int32_t               type = pInput->pData[0]->info.type;
H
Haojun Liao 已提交
751 752 753

  SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

H
Haojun Liao 已提交
754 755
  // computing based on the true data block
  SColumnInfoData* pCol = pInput->pData[0];
H
Haojun Liao 已提交
756

H
Haojun Liao 已提交
757 758
  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;
H
Haojun Liao 已提交
759

H
Haojun Liao 已提交
760 761
  switch (type) {
    case TSDB_DATA_TYPE_TINYINT: {
762 763 764 765
      int8_t* plist = (int8_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + start; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
H
Haojun Liao 已提交
766
        }
H
Haojun Liao 已提交
767

768 769 770 771
        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
H
Haojun Liao 已提交
772 773
      }

774 775 776 777
      break;
    }

    case TSDB_DATA_TYPE_SMALLINT: {
H
Haojun Liao 已提交
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
      int16_t* plist = (int16_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_INT: {
      int32_t* plist = (int32_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }

      break;
    }

    case TSDB_DATA_TYPE_BIGINT: {
      int64_t* plist = (int64_t*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
        pStddevRes->isum += plist[i];
        pStddevRes->quadraticISum += plist[i] * plist[i];
      }
      break;
    }

    case TSDB_DATA_TYPE_FLOAT: {
      float* plist = (float*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
G
Ganlin Zhao 已提交
832 833
        pStddevRes->dsum += plist[i];
        pStddevRes->quadraticDSum += plist[i] * plist[i];
H
Haojun Liao 已提交
834 835 836 837
      }
      break;
    }

H
Haojun Liao 已提交
838 839 840 841 842 843 844 845 846
    case TSDB_DATA_TYPE_DOUBLE: {
      double* plist = (double*)pCol->pData;
      for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
        if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
          continue;
        }

        numOfElem += 1;
        pStddevRes->count += 1;
G
Ganlin Zhao 已提交
847 848
        pStddevRes->dsum += plist[i];
        pStddevRes->quadraticDSum += plist[i] * plist[i];
H
Haojun Liao 已提交
849 850 851 852 853 854 855 856
      }
      break;
    }

    default:
      break;
  }

H
Haojun Liao 已提交
857 858
  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
wmmhello's avatar
wmmhello 已提交
859
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
860 861
}

H
Haojun Liao 已提交
862
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
G
Ganlin Zhao 已提交
863
  SInputColumnInfoData* pInput = &pCtx->input;
864 865 866
  int32_t               type = pInput->pData[0]->info.type;
  SStddevRes*           pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  double                avg;
G
Ganlin Zhao 已提交
867
  if (IS_INTEGER_TYPE(type)) {
868 869
    avg = pStddevRes->isum / ((double)pStddevRes->count);
    pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg);
G
Ganlin Zhao 已提交
870
  } else {
871 872
    avg = pStddevRes->dsum / ((double)pStddevRes->count);
    pStddevRes->result = sqrt(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg);
G
Ganlin Zhao 已提交
873
  }
874

875
  return functionFinalize(pCtx, pBlock);
H
Haojun Liao 已提交
876 877 878 879 880 881 882
}

bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SPercentileInfo);
  return true;
}

883
bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
H
Haojun Liao 已提交
884 885 886 887 888
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  // in the first round, get the min-max value of all involved data
889
  SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
H
Haojun Liao 已提交
890 891 892 893 894
  SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX);
  SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX);
  pInfo->numOfElems = 0;

  return true;
H
Haojun Liao 已提交
895 896
}

897 898 899
int32_t percentileFunction(SqlFunctionCtx* pCtx) {
  int32_t              notNullElems = 0;
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
900 901

  SInputColumnInfoData* pInput = &pCtx->input;
902
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
H
Haojun Liao 已提交
903

904 905
  SColumnInfoData* pCol = pInput->pData[0];
  int32_t          type = pCol->info.type;
906

907
  SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
908 909
  if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
    pInfo->stage += 1;
H
Haojun Liao 已提交
910

H
Haojun Liao 已提交
911 912 913
    // all data are null, set it completed
    if (pInfo->numOfElems == 0) {
      pResInfo->complete = true;
H
Haojun Liao 已提交
914
      return 0;
H
Haojun Liao 已提交
915 916 917 918 919 920 921
    } else {
      pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
    }
  }

  // the first stage, only acquire the min/max value
  if (pInfo->stage == 0) {
922
    if (pCtx->input.colDataAggIsSet) {
H
Haojun Liao 已提交
923
      double tmin = 0.0, tmax = 0.0;
924 925 926 927 928 929 930 931 932
      if (IS_SIGNED_NUMERIC_TYPE(type)) {
        tmin = (double)GET_INT64_VAL(&pAgg->min);
        tmax = (double)GET_INT64_VAL(&pAgg->max);
      } else if (IS_FLOAT_TYPE(type)) {
        tmin = GET_DOUBLE_VAL(&pAgg->min);
        tmax = GET_DOUBLE_VAL(&pAgg->max);
      } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
        tmin = (double)GET_UINT64_VAL(&pAgg->min);
        tmax = (double)GET_UINT64_VAL(&pAgg->max);
H
Haojun Liao 已提交
933 934 935 936 937 938 939 940 941 942
      }

      if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
        SET_DOUBLE_VAL(&pInfo->minval, tmin);
      }

      if (GET_DOUBLE_VAL(&pInfo->maxval) < tmax) {
        SET_DOUBLE_VAL(&pInfo->maxval, tmax);
      }

943
      pInfo->numOfElems += (pInput->numOfRows - pAgg->numOfNull);
H
Haojun Liao 已提交
944
    } else {
945 946 947 948
      // check the valid data one by one
      int32_t start = pInput->startRowIndex;
      for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
H
Haojun Liao 已提交
949 950 951
          continue;
        }

952
        char* data = colDataGetData(pCol, i);
953

H
Haojun Liao 已提交
954 955 956 957 958 959 960 961 962 963 964 965 966 967
        double v = 0;
        GET_TYPED_DATA(v, double, pCtx->inputType, data);
        if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
          SET_DOUBLE_VAL(&pInfo->minval, v);
        }

        if (v > GET_DOUBLE_VAL(&pInfo->maxval)) {
          SET_DOUBLE_VAL(&pInfo->maxval, v);
        }

        pInfo->numOfElems += 1;
      }
    }

H
Haojun Liao 已提交
968
    return 0;
H
Haojun Liao 已提交
969 970 971
  }

  // the second stage, calculate the true percentile value
972 973 974
  int32_t start = pInput->startRowIndex;
  for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
    if (colDataIsNull_f(pCol->nullbitmap, i)) {
H
Haojun Liao 已提交
975 976 977
      continue;
    }

978
    char* data = colDataGetData(pCol, i);
979

H
Haojun Liao 已提交
980 981 982 983
    notNullElems += 1;
    tMemBucketPut(pInfo->pMemBucket, data, 1);
  }

984
  SET_VAL(pResInfo, notNullElems, 1);
wmmhello's avatar
wmmhello 已提交
985
  return TSDB_CODE_SUCCESS;
986 987
}

988
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
989
  SVariant* pVal = &pCtx->param[1].param;
990
  double    v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
H
Haojun Liao 已提交
991

992 993
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  SPercentileInfo*     ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
994

995
  tMemBucket* pMemBucket = ppInfo->pMemBucket;
996 997 998 999 1000
  if (pMemBucket != NULL && pMemBucket->total > 0) {  // check for null
    SET_DOUBLE_VAL(&ppInfo->result, getPercentile(pMemBucket, v));
  }

  tMemBucketDestroy(pMemBucket);
1001
  return functionFinalize(pCtx, pBlock);
H
Haojun Liao 已提交
1002
}
H
Haojun Liao 已提交
1003

H
Haojun Liao 已提交
1004 1005
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
  SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
1006
  pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
H
Haojun Liao 已提交
1007 1008 1009
  return true;
}

1010 1011 1012 1013 1014
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
  if (pTsColInfo == NULL) {
    return 0;
  }

1015
  return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
1016 1017
}

1018 1019
// This ordinary first function does not care if current scan is ascending order or descending order scan
// the OPTIMIZED version of first function will only handle the ascending order scan
1020
int32_t firstFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
1021 1022
  int32_t numOfElems = 0;

1023 1024
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1025 1026

  SInputColumnInfoData* pInput = &pCtx->input;
1027
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
1028

1029 1030
  int32_t bytes = pInputCol->info.bytes;

H
Haojun Liao 已提交
1031
  // All null data column, return directly.
H
Haojun Liao 已提交
1032
  if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
H
Haojun Liao 已提交
1033
    ASSERT(pInputCol->hasNull == true);
H
Haojun Liao 已提交
1034
    return 0;
H
Haojun Liao 已提交
1035 1036
  }

1037
  SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
1038

1039 1040
  TSKEY startKey = getRowPTs(pInput->pPTS, 0);
  TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
1041

1042
  int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057

  if (blockDataOrder == TSDB_ORDER_ASC) {
    // filter according to current result firstly
    if (pResInfo->numOfRes > 0) {
      TSKEY ts = *(TSKEY*)(buf + bytes);
      if (ts < startKey) {
        return TSDB_CODE_SUCCESS;
      }
    }

    for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
        continue;
      }

1058 1059
      numOfElems++;

1060
      char* data = colDataGetData(pInputCol, i);
1061
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1062

1063
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
1064 1065
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
1066
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
1067 1068

        pResInfo->numOfRes = 1;
1069
        break;
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
      }
    }
  } else {
    // in case of descending order time stamp serial, which usually happens as the results of the nest query,
    // all data needs to be check.
    if (pResInfo->numOfRes > 0) {
      TSKEY ts = *(TSKEY*)(buf + bytes);
      if (ts < endKey) {
        return TSDB_CODE_SUCCESS;
      }
H
Haojun Liao 已提交
1080 1081
    }

1082 1083 1084 1085 1086
    for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
        continue;
      }

1087 1088
      numOfElems++;

1089
      char* data = colDataGetData(pInputCol, i);
1090
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1091

1092
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
1093 1094
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
1095
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
1096
        pResInfo->numOfRes = 1;
1097
        break;
1098 1099
      }
    }
H
Haojun Liao 已提交
1100 1101 1102
  }

  SET_VAL(pResInfo, numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
1103
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1104 1105
}

1106
int32_t lastFunction(SqlFunctionCtx* pCtx) {
H
Haojun Liao 已提交
1107 1108
  int32_t numOfElems = 0;

1109 1110
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  char*                buf = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1111 1112

  SInputColumnInfoData* pInput = &pCtx->input;
1113
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
1114

1115 1116
  int32_t bytes = pInputCol->info.bytes;

H
Haojun Liao 已提交
1117
  // All null data column, return directly.
1118
  if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
H
Haojun Liao 已提交
1119
    ASSERT(pInputCol->hasNull == true);
H
Haojun Liao 已提交
1120
    return 0;
H
Haojun Liao 已提交
1121 1122
  }

1123
  SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
1124 1125 1126 1127

  TSKEY startKey = getRowPTs(pInput->pPTS, 0);
  TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);

1128
  int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
1129 1130

  if (blockDataOrder == TSDB_ORDER_ASC) {
H
Haojun Liao 已提交
1131
    for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
1132
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
H
Haojun Liao 已提交
1133 1134 1135 1136
        continue;
      }

      numOfElems++;
1137 1138 1139

      char* data = colDataGetData(pInputCol, i);
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1140
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
1141 1142 1143 1144 1145
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
        pResInfo->numOfRes = 1;
      }
H
Haojun Liao 已提交
1146 1147
      break;
    }
1148
  } else {  // descending order
H
Haojun Liao 已提交
1149
    for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
1150
      if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
H
Haojun Liao 已提交
1151 1152 1153
        continue;
      }

1154
      numOfElems++;
H
Haojun Liao 已提交
1155

1156 1157
      char* data = colDataGetData(pInputCol, i);
      TSKEY cts = getRowPTs(pInput->pPTS, i);
1158
      if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) {
1159 1160 1161
        memcpy(buf, data, bytes);
        *(TSKEY*)(buf + bytes) = cts;
        pResInfo->numOfRes = 1;
1162
        //        DO_UPDATE_TAG_COLUMNS(pCtx, ts);
H
Haojun Liao 已提交
1163 1164 1165 1166 1167 1168
      }
      break;
    }
  }

  SET_VAL(pResInfo, numOfElems, 1);
wmmhello's avatar
wmmhello 已提交
1169
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1170
}
H
Haojun Liao 已提交
1171

H
Haojun Liao 已提交
1172 1173 1174 1175 1176
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SDiffInfo);
  return true;
}

1177
bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
H
Haojun Liao 已提交
1178 1179 1180 1181 1182
  if (!functionSetup(pCtx, pResInfo)) {
    return false;
  }

  SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
1183
  pDiffInfo->hasPrev = false;
H
Haojun Liao 已提交
1184
  pDiffInfo->prev.i64 = 0;
1185
  pDiffInfo->ignoreNegative = false;  // TODO set correct param
H
Haojun Liao 已提交
1186 1187
  pDiffInfo->includeNull = false;
  pDiffInfo->firstOutput = false;
H
Haojun Liao 已提交
1188 1189 1190
  return true;
}

1191 1192 1193
int32_t diffFunction(SqlFunctionCtx* pCtx) {
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  SDiffInfo*           pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
H
Haojun Liao 已提交
1194 1195

  SInputColumnInfoData* pInput = &pCtx->input;
1196
  SColumnInfoData*      pInputCol = pInput->pData[0];
H
Haojun Liao 已提交
1197

1198
  bool    isFirstBlock = (pDiffInfo->hasPrev == false);
H
Haojun Liao 已提交
1199 1200
  int32_t numOfElems = 0;

1201 1202
//  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
//  int32_t start = (pCtx->order == TSDB_ORDER_ASC) ? pInput->startRowIndex : pInput->numOfRows + pInput->startRowIndex - 1;
H
Haojun Liao 已提交
1203

H
Haojun Liao 已提交
1204
  SColumnInfoData* pTsOutput = pCtx->pTsOutput;
1205
  TSKEY*           tsList = (int64_t*)pInput->pPTS->pData;
H
Haojun Liao 已提交
1206

H
Haojun Liao 已提交
1207
  int32_t startOffset = pCtx->offset;
H
Haojun Liao 已提交
1208 1209
  switch (pInputCol->info.type) {
    case TSDB_DATA_TYPE_INT: {
1210
      SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
      if (pCtx->order == TSDB_ORDER_ASC) {
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
          int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
          if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
            if (pDiffInfo->includeNull) {
              colDataSetNull_f(pOutput->nullbitmap, pos);
              if (tsList != NULL) {
                colDataAppendInt64(pTsOutput, pos, &tsList[i]);
              }

              numOfElems += 1;
            }
            continue;
          }

          int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
          if (pDiffInfo->hasPrev) {
            int32_t delta = (int32_t)(v - pDiffInfo->prev.i64);  // direct previous may be null
            if (delta < 0 && pDiffInfo->ignoreNegative) {
              colDataSetNull_f(pOutput->nullbitmap, pos);
            } else {
              colDataAppendInt32(pOutput, pos, &delta);
H
Haojun Liao 已提交
1233 1234
            }

1235 1236 1237
            if (pTsOutput != NULL) {
              colDataAppendInt64(pTsOutput, pos, &tsList[i]);
            }
H
Haojun Liao 已提交
1238
          }
1239 1240 1241 1242

          pDiffInfo->prev.i64 = v;
          pDiffInfo->hasPrev = true;
          numOfElems++;
H
Haojun Liao 已提交
1243
        }
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
      } else {
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
          int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
          int32_t pos = startOffset + numOfElems;

          if (pDiffInfo->hasPrev) {
            int32_t delta = -(int32_t)(v - pDiffInfo->prev.i64);  // direct previous may be null
            if (delta < 0 && pDiffInfo->ignoreNegative) {
              colDataSetNull_f(pOutput->nullbitmap, pos);
            } else {
              colDataAppendInt32(pOutput, pos, &delta);
            }
H
Haojun Liao 已提交
1256

1257 1258 1259 1260
            if (pTsOutput != NULL) {
              colDataAppendInt64(pTsOutput, pos, &tsList[i]);
            }
            pDiffInfo->hasPrev = false;
H
Haojun Liao 已提交
1261 1262
          }

1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277
          if (i < pInput->numOfRows + pInput->startRowIndex - 1) {
            int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1);

            int32_t delta = v - next;  // direct previous may be null
            colDataAppendInt32(pOutput, pos, &delta);

            if (pTsOutput != NULL) {
              colDataAppendInt64(pTsOutput, pos, &tsList[i]);
            }
          } else {
            pDiffInfo->prev.i64 = v;
            if (pTsOutput != NULL) {
              pDiffInfo->prevTs = tsList[i];
            }
            pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1278
          }
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295
//          if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
//            if (pDiffInfo->includeNull) {
//              colDataSetNull_f(pOutput->nullbitmap, pos);
//              if (tsList != NULL) {
//                colDataAppendInt64(pTsOutput, pos, &tsList[i]);
//              }
//
//              numOfElems += 1;
//            }
//            continue;
//          }

//          int32_t v = *(int32_t*)colDataGetData(pInputCol, i);

//          pDiffInfo->prev.i64 = v;
//          pDiffInfo->hasPrev = true;
          numOfElems++;
H
Haojun Liao 已提交
1296 1297 1298 1299 1300
        }

      }
      break;
    }
H
Haojun Liao 已提交
1301

H
Haojun Liao 已提交
1302
    case TSDB_DATA_TYPE_BIGINT: {
1303
      SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
1304
      for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
H
Haojun Liao 已提交
1305 1306 1307 1308 1309
        if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
          continue;
        }

        int32_t v = 0;
H
Haojun Liao 已提交
1310
        if (pDiffInfo->hasPrev) {
1311
          v = *(int64_t*)colDataGetData(pInputCol, i);
H
Haojun Liao 已提交
1312 1313 1314 1315 1316
          int64_t delta = (int64_t)(v - pDiffInfo->prev.i64);  // direct previous may be null
          if (pDiffInfo->ignoreNegative) {
            continue;
          }

1317 1318 1319 1320 1321
          //          *(pOutput++) = delta;
          //          *pTimestamp  = (tsList != NULL)? tsList[i]:0;
          //
          //          pOutput    += 1;
          //          pTimestamp += 1;
H
Haojun Liao 已提交
1322 1323 1324
        }

        pDiffInfo->prev.i64 = v;
H
Haojun Liao 已提交
1325
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
        numOfElems++;
      }
      break;
    }
#if 0
    case TSDB_DATA_TYPE_DOUBLE: {
      double *pData = (double *)data;
      double *pOutput = (double *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }
H
Haojun Liao 已提交
1342

H
Haojun Liao 已提交
1343
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1344 1345 1346 1347 1348 1349 1350
          SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->d64Prev = pData[i];
H
Haojun Liao 已提交
1351
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367
        numOfElems++;
      }
      break;
    }
    case TSDB_DATA_TYPE_FLOAT: {
      float *pData = (float *)data;
      float *pOutput = (float *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1368
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1369 1370 1371 1372 1373 1374 1375
          *pOutput = (float)(pData[i] - pDiffInfo->d64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->d64Prev = pData[i];
H
Haojun Liao 已提交
1376
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392
        numOfElems++;
      }
      break;
    }
    case TSDB_DATA_TYPE_SMALLINT: {
      int16_t *pData = (int16_t *)data;
      int16_t *pOutput = (int16_t *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1393
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1394 1395 1396 1397 1398 1399 1400
          *pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->i64Prev = pData[i];
H
Haojun Liao 已提交
1401
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
        numOfElems++;
      }
      break;
    }

    case TSDB_DATA_TYPE_TINYINT: {
      int8_t *pData = (int8_t *)data;
      int8_t *pOutput = (int8_t *)pCtx->pOutput;

      for (; i < pCtx->size && i >= 0; i += step) {
        if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
          continue;
        }
        if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
          continue;
        }

H
Haojun Liao 已提交
1419
        if (pDiffInfo->hasPrev) {  // initial value is not set yet
H
Haojun Liao 已提交
1420 1421 1422 1423 1424 1425 1426
          *pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev);  // direct previous may be null
          *pTimestamp = (tsList != NULL)? tsList[i]:0;
          pOutput    += 1;
          pTimestamp += 1;
        }

        pDiffInfo->i64Prev = pData[i];
H
Haojun Liao 已提交
1427
        pDiffInfo->hasPrev = true;
H
Haojun Liao 已提交
1428 1429 1430 1431 1432 1433 1434
        numOfElems++;
      }
      break;
    }
#endif
    default:
      break;
1435
      //      qError("error input type");
H
Haojun Liao 已提交
1436 1437 1438
  }

  // initial value is not set yet
H
Haojun Liao 已提交
1439
  if (!pDiffInfo->hasPrev || numOfElems <= 0) {
H
Haojun Liao 已提交
1440 1441 1442 1443 1444
    /*
     * 1. current block and blocks before are full of null
     * 2. current block may be null value
     */
    assert(pCtx->hasNull);
wmmhello's avatar
wmmhello 已提交
1445
    return 0;
H
Haojun Liao 已提交
1446
  } else {
1447 1448 1449 1450 1451 1452
    //    for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
    //      SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
    //      if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
    //        aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
    //      }
    //    }
H
Haojun Liao 已提交
1453 1454 1455 1456

    int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
    return forwardStep;
  }
H
Haojun Liao 已提交
1457
}
H
Haojun Liao 已提交
1458

1459
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
1460
  SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
1461
  pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
1462 1463 1464
  return true;
}

1465 1466 1467 1468
static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
  STopBotRes*          pRes = GET_ROWCELL_INTERBUF(pResInfo);
  pRes->pItems = (STopBotResItem*)((char*)pRes + sizeof(STopBotRes));
1469 1470

  return pRes;
1471 1472
}

1473 1474
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
                            uint64_t uid, SResultRowEntryInfo* pEntryInfo);
1475

1476 1477 1478
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);

1479 1480 1481
int32_t topFunction(SqlFunctionCtx* pCtx) {
  int32_t              numOfElems = 0;
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
1482

1483 1484 1485
  //  if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
  //    buildTopBotStruct(pRes, pCtx);
  //  }
1486 1487

  SInputColumnInfoData* pInput = &pCtx->input;
1488
  SColumnInfoData*      pCol = pInput->pData[0];
1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501

  int32_t type = pInput->pData[0]->info.type;

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;

  for (int32_t i = start; i < numOfRows + start; ++i) {
    if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
      continue;
    }
    numOfElems++;

    char* data = colDataGetData(pCol, i);
1502
    doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo);
1503 1504
  }

1505
  return TSDB_CODE_SUCCESS;
1506 1507
}

1508 1509
static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) {
  uint16_t type = *(uint16_t*)param;
1510

1511 1512
  STopBotResItem* val1 = (STopBotResItem*)p1;
  STopBotResItem* val2 = (STopBotResItem*)p2;
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534

  if (IS_SIGNED_NUMERIC_TYPE(type)) {
    if (val1->v.i == val2->v.i) {
      return 0;
    }

    return (val1->v.i > val2->v.i) ? 1 : -1;
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
    if (val1->v.u == val2->v.u) {
      return 0;
    }

    return (val1->v.u > val2->v.u) ? 1 : -1;
  }

  if (val1->v.d == val2->v.d) {
    return 0;
  }

  return (val1->v.d > val2->v.d) ? 1 : -1;
}

1535 1536
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
                     uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
1537 1538
  STopBotRes* pRes = getTopBotOutputInfo(pCtx);
  int32_t     maxSize = pCtx->param[1].param.i;
1539

1540 1541 1542
  SVariant val = {0};
  taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);

1543
  STopBotResItem* pItems = pRes->pItems;
1544 1545 1546
  assert(pItems != NULL);

  // not full yet
1547 1548
  if (pEntryInfo->numOfRes < maxSize) {
    STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
1549
    pItem->v = val;
1550
    pItem->uid = uid;
1551

1552 1553
    // save the data of this tuple
    saveTupleData(pCtx, rowIndex, pSrcBlock, pItem);
1554 1555 1556

    // allocate the buffer and keep the data of this row into the new allocated buffer
    pEntryInfo->numOfRes++;
1557 1558 1559
    taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
                 false);
  } else {  // replace the minimum value in the result
1560
    if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
1561
        (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
1562
      // replace the old data and the coresponding tuple data
1563
      STopBotResItem* pItem = &pItems[0];
1564
      pItem->v = val;
1565
      pItem->uid = uid;
1566 1567 1568

      // save the data of this tuple by over writing the old data
      copyTupleData(pCtx, rowIndex, pSrcBlock, pItem);
1569

1570 1571
      taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
                     topBotResComparFn, NULL, false);
1572
    }
1573 1574
  }
}
1575

1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
  SFilePage* pPage = NULL;

  int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool);

  if (pCtx->curBufPage == -1) {
    pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
    pPage->num = sizeof(SFilePage);
  } else {
    pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage);
    if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) {
      pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
      pPage->num = sizeof(SFilePage);
    }
  }

  pItem->tuplePos.pageId = pCtx->curBufPage;

  // keep the current row data, extract method
  int32_t offset = 0;
  bool*   nullList = (bool*)((char*)pPage + pPage->num);
  char*   pStart = (char*)(nullList + sizeof(bool) * pSrcBlock->info.numOfCols);
  for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
    bool             isNull = colDataIsNull_s(pCol, rowIndex);
    if (isNull) {
      nullList[i] = true;
      continue;
    }

    char* p = colDataGetData(pCol, rowIndex);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      memcpy(pStart + offset, p, varDataTLen(p));
    } else {
      memcpy(pStart + offset, p, pCol->info.bytes);
    }

    offset += pCol->info.bytes;
  }

  pItem->tuplePos.offset = pPage->num;
  pPage->num += completeRowSize;

  setBufPageDirty(pPage, true);
  releaseBufPage(pCtx->pBuf, pPage);
}

void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
  SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId);

  bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset);
  char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));

  int32_t offset = 0;
1630
  for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650
    SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
    if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
      continue;
    }

    char* p = colDataGetData(pCol, rowIndex);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      memcpy(pStart + offset, p, varDataTLen(p));
    } else {
      memcpy(pStart + offset, p, pCol->info.bytes);
    }

    offset += pCol->info.bytes;
  }

  setBufPageDirty(pPage, true);
  releaseBufPage(pCtx->pBuf, pPage);
}

int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
1651 1652
  SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
  STopBotRes*          pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
1653 1654
  pEntryInfo->complete = true;

1655 1656
  int32_t          type = pCtx->input.pData[0]->info.type;
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
1657 1658 1659 1660
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

  // todo assign the tag value and the corresponding row data
  int32_t currentRow = pBlock->info.rows;
1661
  switch (type) {
1662 1663 1664
    case TSDB_DATA_TYPE_INT: {
      for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
        STopBotResItem* pItem = &pRes->pItems[i];
1665
        colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i);
1666 1667 1668

        int32_t pageId = pItem->tuplePos.pageId;
        int32_t offset = pItem->tuplePos.offset;
1669 1670
        if (pItem->tuplePos.pageId != -1) {
          SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
H
Haojun Liao 已提交
1671

1672 1673 1674 1675 1676 1677 1678
          bool* nullList = (bool*)((char*)pPage + offset);
          char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));

          // todo set the offset value to optimize the performance.
          for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
            SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];

1679 1680 1681
            SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
            int32_t      srcSlotId = pFuncParam->pCol->slotId;
            int32_t      dstSlotId = pCtx->pExpr->base.resSchema.slotId;
1682 1683

            int32_t ps = 0;
1684
            for (int32_t k = 0; k < srcSlotId; ++k) {
1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695
              SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
              ps += pSrcCol->info.bytes;
            }

            SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
            if (nullList[srcSlotId]) {
              colDataAppendNULL(pDstCol, currentRow);
            } else {
              colDataAppend(pDstCol, currentRow, (pStart + ps), false);
            }
          }
1696
        }
1697 1698

        currentRow += 1;
1699
      }
H
Haojun Liao 已提交
1700

1701 1702 1703 1704 1705
      break;
    }
  }

  return pEntryInfo->numOfRes;
1706
}
G
Ganlin Zhao 已提交
1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804

bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(SSpreadInfo);
  return true;
}

bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
  if (!functionSetup(pCtx, pResultInfo)) {
    return false;
  }

  SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
  SET_DOUBLE_VAL(&pInfo->min, DBL_MAX);
  SET_DOUBLE_VAL(&pInfo->max, -DBL_MAX);
  pInfo->hasResult = false;
  return true;
}

int32_t spreadFunction(SqlFunctionCtx *pCtx) {
  int32_t numOfElems = 0;

  // Only the pre-computing information loaded and actual data does not loaded
  SInputColumnInfoData* pInput = &pCtx->input;
  SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
  int32_t type = pInput->pData[0]->info.type;

  SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));

  if (pInput->colDataAggIsSet) {
    numOfElems = pInput->numOfRows - pAgg->numOfNull;
    if (numOfElems == 0) {
      goto _spread_over;
    }
    double tmin = 0.0, tmax = 0.0;
    if (IS_SIGNED_NUMERIC_TYPE(type)) {
      tmin = (double)GET_INT64_VAL(&pAgg->min);
      tmax = (double)GET_INT64_VAL(&pAgg->max);
    } else if (IS_FLOAT_TYPE(type)) {
      tmin = GET_DOUBLE_VAL(&pAgg->min);
      tmax = GET_DOUBLE_VAL(&pAgg->max);
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
      tmin = (double)GET_UINT64_VAL(&pAgg->min);
      tmax = (double)GET_UINT64_VAL(&pAgg->max);
    }

    if (GET_DOUBLE_VAL(&pInfo->min) > tmin) {
      SET_DOUBLE_VAL(&pInfo->min, tmin);
    }

    if (GET_DOUBLE_VAL(&pInfo->max) < tmax) {
      SET_DOUBLE_VAL(&pInfo->max, tmax);
    }

  } else {  // computing based on the true data block
    SColumnInfoData* pCol = pInput->pData[0];

    int32_t start     = pInput->startRowIndex;
    int32_t numOfRows = pInput->numOfRows;

    // check the valid data one by one
    for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
      if (colDataIsNull_f(pCol->nullbitmap, i)) {
        continue;
      }

      char *data = colDataGetData(pCol, i);

      double v = 0;
      GET_TYPED_DATA(v, double, type, data);
      if (v < GET_DOUBLE_VAL(&pInfo->min)) {
        SET_DOUBLE_VAL(&pInfo->min, v);
      }

      if (v > GET_DOUBLE_VAL(&pInfo->max)) {
        SET_DOUBLE_VAL(&pInfo->max, v);
      }

      numOfElems += 1;
    }
  }

_spread_over:
  // data in the check operation are all null, not output
  SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
  if (numOfElems > 0) {
    pInfo->hasResult = true;
  }

  return TSDB_CODE_SUCCESS;
}

int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
  SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  if (pInfo->hasResult == true) {
    SET_DOUBLE_VAL(&pInfo->result, pInfo->max - pInfo->min);
  }
  return functionFinalize(pCtx, pBlock);
}