sclfunc.c 23.7 KB
Newer Older
H
Haojun Liao 已提交
1 2 3
#include "function.h"
#include "scalar.h"
#include "tdatablock.h"
H
Haojun Liao 已提交
4
#include "sclInt.h"
D
dapan1121 已提交
5
#include "sclvector.h"
6

G
Ganlin Zhao 已提交
7 8 9 10 11
typedef float (*_float_fn)(float);
typedef double (*_double_fn)(double);
typedef double (*_double_fn_2)(double, double);
typedef int (*_conv_fn)(int);
typedef void (*_trim_fn)(char *, char*, int32_t, int32_t);
G
Ganlin Zhao 已提交
12
typedef int16_t (*_len_fn)(char *, int32_t);
G
Ganlin Zhao 已提交
13

G
Ganlin Zhao 已提交
14
/** Math functions **/
G
Ganlin Zhao 已提交
15 16
double tlog(double v, double base) {
  return log(v) / log(base);
17 18
}

19
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
H
Haojun Liao 已提交
20 21 22 23 24
  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

  int32_t type = GET_PARAM_TYPE(pInput);
  if (!IS_NUMERIC_TYPE(type)) {
25 26
    return TSDB_CODE_FAILED;
  }
27

H
Haojun Liao 已提交
28
  switch (type) {
29
    case TSDB_DATA_TYPE_FLOAT: {
H
Haojun Liao 已提交
30 31
      float *in  = (float *)pInputData->pData;
      float *out = (float *)pOutputData->pData;
32
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
33
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
34
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
35 36
          continue;
        }
37
        out[i] = (in[i] >= 0)? in[i] : -in[i];
38
      }
H
Haojun Liao 已提交
39
      break;
40 41 42
    }

    case TSDB_DATA_TYPE_DOUBLE: {
H
Haojun Liao 已提交
43 44
      double *in  = (double *)pInputData->pData;
      double *out = (double *)pOutputData->pData;
45
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
46
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
47
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
48 49
          continue;
        }
50
        out[i] = (in[i] >= 0)? in[i] : -in[i];
H
Haojun Liao 已提交
51 52
      }
      break;
53 54
    }

H
Haojun Liao 已提交
55
    case TSDB_DATA_TYPE_TINYINT: {
H
Haojun Liao 已提交
56 57
      int8_t *in  = (int8_t *)pInputData->pData;
      int8_t *out = (int8_t *)pOutputData->pData;
58
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
59
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
60
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
61 62
          continue;
        }
63
        out[i] = (in[i] >= 0)? in[i] : -in[i];
64
      }
H
Haojun Liao 已提交
65 66
      break;
    }
67

H
Haojun Liao 已提交
68
    case TSDB_DATA_TYPE_SMALLINT: {
H
Haojun Liao 已提交
69 70
      int16_t *in  = (int16_t *)pInputData->pData;
      int16_t *out = (int16_t *)pOutputData->pData;
71
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
72
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
73
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
74 75
          continue;
        }
76
        out[i] = (in[i] >= 0)? in[i] : -in[i];
77
      }
H
Haojun Liao 已提交
78 79
      break;
    }
80

H
Haojun Liao 已提交
81
    case TSDB_DATA_TYPE_INT: {
H
Haojun Liao 已提交
82 83
      int32_t *in  = (int32_t *)pInputData->pData;
      int32_t *out = (int32_t *)pOutputData->pData;
84
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
85
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
86
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
87 88
          continue;
        }
89
        out[i] = (in[i] >= 0)? in[i] : -in[i];
90
      }
H
Haojun Liao 已提交
91 92
      break;
    }
93

H
Haojun Liao 已提交
94
    case TSDB_DATA_TYPE_BIGINT: {
H
Haojun Liao 已提交
95 96
      int64_t *in  = (int64_t *)pInputData->pData;
      int64_t *out = (int64_t *)pOutputData->pData;
97
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
98
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
99
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
100 101
          continue;
        }
102
        out[i] = (in[i] >= 0)? in[i] : -in[i];
103
      }
H
Haojun Liao 已提交
104
      break;
105 106
    }

H
Haojun Liao 已提交
107 108
    default: {
      colDataAssign(pOutputData, pInputData, pInput->numOfRows);
109 110
    }
  }
111

H
Haojun Liao 已提交
112
  pOutput->numOfRows = pInput->numOfRows;
113
  return TSDB_CODE_SUCCESS;
114 115
}

116
int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
H
Haojun Liao 已提交
117 118
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
119 120 121
    return TSDB_CODE_FAILED;
  }

122
  SColumnInfoData *pInputData = pInput->columnData;
H
Haojun Liao 已提交
123
  SColumnInfoData *pOutputData = pOutput->columnData;
124

125
  _getDoubleValue_fn_t getValueFn = getVectorDoubleValueFn(type);
126

127
  double *out = (double *)pOutputData->pData;
128

129 130 131 132
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
    if (colDataIsNull_f(pInputData->nullbitmap, i)) {
      colDataSetNull_f(pOutputData->nullbitmap, i);
      continue;
133
    }
134
    out[i] = valFn(getValueFn(pInputData->pData, i));
135
  }
136

H
Haojun Liao 已提交
137
  pOutput->numOfRows = pInput->numOfRows;
138 139 140
  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
  if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData[2];
  SColumnInfoData *pOutputData = pOutput->columnData;
  _getDoubleValue_fn_t getValueFn[2];

  for (int32_t i = 0; i < inputNum; ++i) {
    pInputData[i] = pInput[i].columnData;
    getValueFn[i]= getVectorDoubleValueFn(GET_PARAM_TYPE(&pInput[i]));
  }

  double *out = (double *)pOutputData->pData;

  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
    if (colDataIsNull_f(pInputData[0]->nullbitmap, i) ||
        colDataIsNull_f(pInputData[1]->nullbitmap, 0)) {
      colDataSetNull_f(pOutputData->nullbitmap, i);
      continue;
    }
    out[i] = valFn(getValueFn[0](pInputData[0]->pData, i), getValueFn[1](pInputData[1]->pData, 0));
  }

  pOutput->numOfRows = pInput->numOfRows;
  return TSDB_CODE_SUCCESS;
}

170
int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) {
H
Haojun Liao 已提交
171 172
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
173 174 175
    return TSDB_CODE_FAILED;
  }

H
Haojun Liao 已提交
176 177
  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;
178

H
Haojun Liao 已提交
179 180 181 182
  switch (type) {
    case TSDB_DATA_TYPE_FLOAT: {
      float *in  = (float *)pInputData->pData;
      float *out = (float *)pOutputData->pData;
183

184
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
185
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
186
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
187 188
          continue;
        }
189
        out[i] = f1(in[i]);
190
      }
H
Haojun Liao 已提交
191 192
      break;
    }
193

H
Haojun Liao 已提交
194 195 196
    case TSDB_DATA_TYPE_DOUBLE: {
      double *in  = (double *)pInputData->pData;
      double *out = (double *)pOutputData->pData;
197

198
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
199
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
200
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
201 202
          continue;
        }
203
        out[i] = d1(in[i]);
204
      }
H
Haojun Liao 已提交
205 206 207 208 209
      break;
    }

    default: {
      colDataAssign(pOutputData, pInputData, pInput->numOfRows);
210
    }
211 212
  }

H
Haojun Liao 已提交
213
  pOutput->numOfRows = pInput->numOfRows;
214 215
  return TSDB_CODE_SUCCESS;
}
216

G
Ganlin Zhao 已提交
217
/** String functions **/
G
Ganlin Zhao 已提交
218 219 220 221 222 223 224 225 226
int16_t tlength(char *input, int32_t type) {
  return varDataLen(input);
}

int16_t tcharlength(char *input, int32_t type) {
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    return varDataLen(input);
  } else { //NCHAR
    return varDataLen(input) / TSDB_NCHAR_SIZE;
G
Ganlin Zhao 已提交
227
  }
G
Ganlin Zhao 已提交
228
}
G
Ganlin Zhao 已提交
229

G
Ganlin Zhao 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
  int32_t numOfSpaces = 0;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    for (int32_t i = 0; i < charLen; ++i) {
      if (!isspace(*(varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
    }
  } else { //NCHAR
    for (int32_t i = 0; i < charLen; ++i) {
      if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
    }
  }
G
Ganlin Zhao 已提交
247

G
Ganlin Zhao 已提交
248 249 250 251 252 253 254 255
  int32_t resLen;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    resLen = charLen - numOfSpaces;
    memcpy(varDataVal(output), varDataVal(input) + numOfSpaces, resLen);
  } else {
    resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
    memcpy(varDataVal(output), varDataVal(input) + numOfSpaces * TSDB_NCHAR_SIZE, resLen);
  }
G
Ganlin Zhao 已提交
256

G
Ganlin Zhao 已提交
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
  varDataSetLen(output, resLen);
}

void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
  int32_t numOfSpaces = 0;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    for (int32_t i = charLen - 1; i >= 0; --i) {
      if (!isspace(*(varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
    }
  } else { //NCHAR
    for (int32_t i = charLen - 1; i < charLen; ++i) {
      if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
G
Ganlin Zhao 已提交
275
    }
G
Ganlin Zhao 已提交
276
  }
G
Ganlin Zhao 已提交
277

G
Ganlin Zhao 已提交
278 279 280 281 282
  int32_t resLen;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    resLen = charLen - numOfSpaces;
  } else {
    resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
G
Ganlin Zhao 已提交
283
  }
G
Ganlin Zhao 已提交
284
  memcpy(varDataVal(output), varDataVal(input), resLen);
G
Ganlin Zhao 已提交
285

G
Ganlin Zhao 已提交
286
  varDataSetLen(output, resLen);
G
Ganlin Zhao 已提交
287 288
}

G
Ganlin Zhao 已提交
289
int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) {
G
Ganlin Zhao 已提交
290 291 292 293 294 295 296 297
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

G
Ganlin Zhao 已提交
298
  char *in = pInputData->pData;
G
Ganlin Zhao 已提交
299 300 301 302 303 304 305 306
  int16_t *out = (int16_t *)pOutputData->pData;

  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
    if (colDataIsNull_f(pInputData->nullbitmap, i)) {
      colDataSetNull_f(pOutputData->nullbitmap, i);
      continue;
    }

G
Ganlin Zhao 已提交
307 308
    out[i] = lenFn(in, type);
    in += varDataTLen(in);
G
Ganlin Zhao 已提交
309 310 311 312 313
  }

  pOutput->numOfRows = pInput->numOfRows;
  return TSDB_CODE_SUCCESS;
}
314 315 316 317 318 319 320

static void setVarTypeOutputBuf(SColumnInfoData *pOutputData, int32_t len, int32_t type) {
  pOutputData->pData = taosMemoryCalloc(len, sizeof(char));
  pOutputData->info.type = type;
  pOutputData->info.bytes = len;
  pOutputData->varmeta.length = len;
  pOutputData->varmeta.allocLen = len;
321
}
G
Ganlin Zhao 已提交
322 323 324 325 326 327 328 329

int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
  SColumnInfoData *pOutputData = pOutput->columnData;
330
  char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
331
  char *outputBuf = NULL;
G
Ganlin Zhao 已提交
332

333
  int32_t inputLen = 0;
334 335
  int32_t numOfRows = 0;
  for (int32_t i = 0; i < inputNum; ++i) {
336 337 338 339
    if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
        GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[0])) {
      return TSDB_CODE_FAILED;
    }
340 341 342 343
    if (pInput[i].numOfRows > numOfRows) {
      numOfRows = pInput[i].numOfRows;
    }
  }
G
Ganlin Zhao 已提交
344 345
  for (int32_t i = 0; i < inputNum; ++i) {
    pInputData[i] = pInput[i].columnData;
346
    input[i] = pInputData[i]->pData;
347 348 349 350 351
    if (pInput[i].numOfRows == 1) {
      inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows;
    } else {
      inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
    }
352 353
  }

354 355 356
  int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
  outputBuf = taosMemoryCalloc(outputLen, 1);
  char *output = outputBuf;
G
Ganlin Zhao 已提交
357 358

  bool hasNull = false;
359
  for (int32_t k = 0; k < numOfRows; ++k) {
G
Ganlin Zhao 已提交
360
    for (int32_t i = 0; i < inputNum; ++i) {
361 362
      if (colDataIsNull_s(pInputData[i], k)) {
        colDataAppendNULL(pOutputData, k);
G
Ganlin Zhao 已提交
363 364 365 366 367 368 369 370 371 372 373
        hasNull = true;
        break;
      }
    }

    if (hasNull) {
      continue;
    }

    int16_t dataLen = 0;
    for (int32_t i = 0; i < inputNum; ++i) {
374 375
      memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
      dataLen += varDataLen(input[i]);
376 377 378
      if (pInput[i].numOfRows != 1) {
        input[i] += varDataTLen(input[i]);
      }
G
Ganlin Zhao 已提交
379
    }
380
    varDataSetLen(output, dataLen);
381 382
    colDataAppend(pOutputData, k, output, false);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
383 384
  }

385
  pOutput->numOfRows = numOfRows;
386
  taosMemoryFree(input);
387
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
388 389 390 391 392 393 394 395 396 397 398 399
  taosMemoryFree(pInputData);

  return TSDB_CODE_SUCCESS;
}

int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
  SColumnInfoData *pOutputData = pOutput->columnData;
400
  char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
401
  char *outputBuf = NULL;
G
Ganlin Zhao 已提交
402

403
  int32_t inputLen = 0;
404
  int32_t numOfRows = 0;
405 406 407 408 409
  for (int32_t i = 1; i < inputNum; ++i) {
    if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i])) ||
        GET_PARAM_TYPE(&pInput[i]) != GET_PARAM_TYPE(&pInput[1])) {
      return TSDB_CODE_FAILED;
    }
410 411 412 413
    if (pInput[i].numOfRows > numOfRows) {
      numOfRows = pInput[i].numOfRows;
    }
  }
G
Ganlin Zhao 已提交
414 415
  for (int32_t i = 0; i < inputNum; ++i) {
    pInputData[i] = pInput[i].columnData;
416 417
    if (i == 0) {
      // calculate required separator space
418 419
      int32_t factor =  (GET_PARAM_TYPE(&pInput[1]) == TSDB_DATA_TYPE_NCHAR) ? TSDB_NCHAR_SIZE : 1;
      inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor;
420 421
    } else if (pInput[i].numOfRows == 1) {
      inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows;
422 423 424 425 426 427
    } else {
      inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
    }
    input[i] = pInputData[i]->pData;
  }

428 429 430
  int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
  outputBuf = taosMemoryCalloc(outputLen, 1);
  char *output = outputBuf;
G
Ganlin Zhao 已提交
431

432
  for (int32_t k = 0; k < numOfRows; ++k) {
433 434
    if (colDataIsNull_s(pInputData[0], k)) {
      colDataAppendNULL(pOutputData, k);
G
Ganlin Zhao 已提交
435 436 437 438 439
      continue;
    }

    int16_t dataLen = 0;
    for (int32_t i = 1; i < inputNum; ++i) {
440
      if (colDataIsNull_s(pInputData[i], k)) {
G
Ganlin Zhao 已提交
441 442 443
        continue;
      }

444 445
      memcpy(varDataVal(output) + dataLen, varDataVal(input[i]), varDataLen(input[i]));
      dataLen += varDataLen(input[i]);
446 447 448
      if (pInput[i].numOfRows != 1) {
        input[i] += varDataTLen(input[i]);
      }
G
Ganlin Zhao 已提交
449 450 451

      if (i < inputNum - 1) {
        //insert the separator
452
        char *sep = pInputData[0]->pData;
453
        memcpy(varDataVal(output) + dataLen, varDataVal(sep), varDataLen(sep));
G
Ganlin Zhao 已提交
454 455 456
        dataLen += varDataLen(sep);
      }
    }
457
    varDataSetLen(output, dataLen);
458 459
    colDataAppend(pOutputData, k, output, false);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
460 461
  }

462 463
  pOutput->numOfRows = numOfRows;
  taosMemoryFree(input);
464
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
465 466 467 468 469 470 471 472 473 474 475 476 477 478
  taosMemoryFree(pInputData);

  return TSDB_CODE_SUCCESS;
}

int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) {
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

479
  char *input  = pInputData->pData;
480 481 482 483 484
  char *output = NULL;

  int32_t outputLen = pInputData->varmeta.length;
  char *outputBuf = taosMemoryCalloc(outputLen, 1);
  output = outputBuf;
485

G
Ganlin Zhao 已提交
486
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
487 488
    if (colDataIsNull_s(pInputData, i)) {
      colDataAppendNULL(pOutputData, i);
G
Ganlin Zhao 已提交
489 490 491
      continue;
    }

492
    int32_t len = varDataLen(input);
G
Ganlin Zhao 已提交
493 494
    if (type == TSDB_DATA_TYPE_VARCHAR) {
      for (int32_t j = 0; j < len; ++j) {
495
        *(varDataVal(output) + j) = convFn(*(varDataVal(input) + j));
G
Ganlin Zhao 已提交
496 497 498
      }
    } else { //NCHAR
      for (int32_t j = 0; j < len / TSDB_NCHAR_SIZE; ++j) {
499
        *((uint32_t *)varDataVal(output) + j) = convFn(*((uint32_t *)varDataVal(input) + j));
G
Ganlin Zhao 已提交
500 501
      }
    }
502
    varDataSetLen(output, len);
503
    colDataAppend(pOutputData, i, output, false);
504 505
    input += varDataTLen(input);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
506 507 508
  }

  pOutput->numOfRows = pInput->numOfRows;
509
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
510 511 512 513 514 515 516 517 518 519 520 521 522 523

  return TSDB_CODE_SUCCESS;
}


int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) {
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

524
  char *input  = pInputData->pData;
525 526 527 528 529
  char *output = NULL;

  int32_t outputLen = pInputData->varmeta.length;
  char *outputBuf = taosMemoryCalloc(outputLen, 1);
  output = outputBuf;
530

G
Ganlin Zhao 已提交
531
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
532 533
    if (colDataIsNull_s(pInputData, i)) {
      colDataAppendNULL(pOutputData, i);
G
Ganlin Zhao 已提交
534 535 536
      continue;
    }

537
    int32_t len = varDataLen(input);
G
Ganlin Zhao 已提交
538
    int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
539 540 541
    trimFn(input, output, type, charLen);

    varDataSetLen(output, len);
542
    colDataAppend(pOutputData, i, output, false);
543 544
    input += varDataTLen(input);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
545 546 547
  }

  pOutput->numOfRows = pInput->numOfRows;
548
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
549 550 551 552 553

  return TSDB_CODE_SUCCESS;
}

int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
554
  if (inputNum != 2 && inputNum!= 3) {
G
Ganlin Zhao 已提交
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
    return TSDB_CODE_FAILED;
  }

  int32_t subPos = 0;
  GET_TYPED_DATA(subPos, int32_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
  if (subPos == 0) { //subPos needs to be positive or negative values;
    return TSDB_CODE_FAILED;
  }

  int32_t subLen = INT16_MAX;
  if (inputNum == 3) {
    GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
    if (subLen < 0) { //subLen cannot be negative
      return TSDB_CODE_FAILED;
    }
    subLen = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subLen : subLen * TSDB_NCHAR_SIZE;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

576
  char *input  = pInputData->pData;
577 578 579 580 581
  char *output = NULL;

  int32_t outputLen = pInputData->varmeta.length;
  char *outputBuf = taosMemoryCalloc(outputLen, 1);
  output = outputBuf;
582 583

  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
584 585
    if (colDataIsNull_s(pInputData, i)) {
      colDataAppendNULL(pOutputData, i);
G
Ganlin Zhao 已提交
586 587 588
      continue;
    }

589
    int32_t len = varDataLen(input);
G
Ganlin Zhao 已提交
590 591 592 593 594 595 596 597 598 599 600 601
    int32_t startPosBytes;

    if (subPos > 0) {
      startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subPos - 1 : (subPos - 1) * TSDB_NCHAR_SIZE;
      startPosBytes = MIN(startPosBytes, len);
    } else {
      startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? len + subPos : len + subPos * TSDB_NCHAR_SIZE;
      startPosBytes = MAX(startPosBytes, 0);
    }

    subLen = MIN(subLen, len - startPosBytes);
    if (subLen > 0) {
602
      memcpy(varDataVal(output), varDataVal(input) + startPosBytes, subLen);
G
Ganlin Zhao 已提交
603 604
    }

605
    varDataSetLen(output, subLen);
606
    colDataAppend(pOutputData, i , output, false);
607 608
    input += varDataTLen(input);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
609 610 611
  }

  pOutput->numOfRows = pInput->numOfRows;
612
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
613 614 615 616 617

  return TSDB_CODE_SUCCESS;
}


618 619 620
int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, atan);
}
621

622 623 624
int32_t sinFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, sin);
}
625

626 627 628
int32_t cosFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, cos);
}
629

630 631 632
int32_t tanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, tan);
}
633

634 635 636
int32_t asinFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, asin);
}
637

638 639 640
int32_t acosFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, acos);
}
H
Haojun Liao 已提交
641

G
Ganlin Zhao 已提交
642 643 644 645 646 647 648 649
int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique2(pInput, inputNum, pOutput, pow);
}

int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique2(pInput, inputNum, pOutput, tlog);
}

650 651 652
int32_t sqrtFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, sqrt);
}
653

654 655 656 657 658 659 660 661 662 663
int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunction(pInput, inputNum, pOutput, ceilf, ceil);
}

int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunction(pInput, inputNum, pOutput, floorf, floor);
}

int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunction(pInput, inputNum, pOutput, roundf, round);
664 665
}

G
Ganlin Zhao 已提交
666 667
int32_t lowerFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doCaseConvFunction(pInput, inputNum, pOutput, tolower);
668 669
}

G
Ganlin Zhao 已提交
670 671
int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doCaseConvFunction(pInput, inputNum, pOutput, toupper);
672 673
}

G
Ganlin Zhao 已提交
674 675
int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doTrimFunction(pInput, inputNum, pOutput, tltrim);
676 677
}

G
Ganlin Zhao 已提交
678 679
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doTrimFunction(pInput, inputNum, pOutput, trtrim);
680 681
}

G
Ganlin Zhao 已提交
682 683 684 685 686 687 688 689
int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doLengthFunction(pInput, inputNum, pOutput, tlength);
}

int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doLengthFunction(pInput, inputNum, pOutput, tcharlength);
}

H
Haojun Liao 已提交
690
#if 0
691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
  switch(type) {
    case TSDB_DATA_TYPE_TINYINT:
    case TSDB_DATA_TYPE_UTINYINT:{
      int8_t* p = (int8_t*) dest;
      int8_t* pSrc = (int8_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }

    case TSDB_DATA_TYPE_SMALLINT:
    case TSDB_DATA_TYPE_USMALLINT:{
      int16_t* p = (int16_t*) dest;
      int16_t* pSrc = (int16_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_INT:
    case TSDB_DATA_TYPE_UINT: {
      int32_t* p = (int32_t*) dest;
      int32_t* pSrc = (int32_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_BIGINT:
    case TSDB_DATA_TYPE_UBIGINT: {
      int64_t* p = (int64_t*) dest;
      int64_t* pSrc = (int64_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_FLOAT: {
      float* p = (float*) dest;
      float* pSrc = (float*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_DOUBLE: {
      double* p = (double*) dest;
      double* pSrc = (double*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    default: assert(0);
  }
}
H
Haojun Liao 已提交
755
#endif
756

757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786
bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
}

int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1));
}

int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2));
}

int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 3));
  return TSDB_CODE_SUCCESS;
}

int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
  return TSDB_CODE_SUCCESS;
787
}