sclfunc.c 24.8 KB
Newer Older
H
Haojun Liao 已提交
1 2 3
#include "function.h"
#include "scalar.h"
#include "tdatablock.h"
H
Haojun Liao 已提交
4
#include "sclInt.h"
D
dapan1121 已提交
5
#include "sclvector.h"
6

G
Ganlin Zhao 已提交
7 8 9 10 11
typedef float (*_float_fn)(float);
typedef double (*_double_fn)(double);
typedef double (*_double_fn_2)(double, double);
typedef int (*_conv_fn)(int);
typedef void (*_trim_fn)(char *, char*, int32_t, int32_t);
G
Ganlin Zhao 已提交
12
typedef int16_t (*_len_fn)(char *, int32_t);
G
Ganlin Zhao 已提交
13

G
Ganlin Zhao 已提交
14
/** Math functions **/
15
static double tlog(double v, double base) {
G
Ganlin Zhao 已提交
16
  return log(v) / log(base);
17 18
}

19
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
H
Haojun Liao 已提交
20 21 22 23 24
  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

  int32_t type = GET_PARAM_TYPE(pInput);
  if (!IS_NUMERIC_TYPE(type)) {
25 26
    return TSDB_CODE_FAILED;
  }
27

H
Haojun Liao 已提交
28
  switch (type) {
29
    case TSDB_DATA_TYPE_FLOAT: {
H
Haojun Liao 已提交
30 31
      float *in  = (float *)pInputData->pData;
      float *out = (float *)pOutputData->pData;
32
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
33
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
34
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
35 36
          continue;
        }
37
        out[i] = (in[i] >= 0)? in[i] : -in[i];
38
      }
H
Haojun Liao 已提交
39
      break;
40 41 42
    }

    case TSDB_DATA_TYPE_DOUBLE: {
H
Haojun Liao 已提交
43 44
      double *in  = (double *)pInputData->pData;
      double *out = (double *)pOutputData->pData;
45
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
46
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
47
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
48 49
          continue;
        }
50
        out[i] = (in[i] >= 0)? in[i] : -in[i];
H
Haojun Liao 已提交
51 52
      }
      break;
53 54
    }

H
Haojun Liao 已提交
55
    case TSDB_DATA_TYPE_TINYINT: {
H
Haojun Liao 已提交
56 57
      int8_t *in  = (int8_t *)pInputData->pData;
      int8_t *out = (int8_t *)pOutputData->pData;
58
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
59
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
60
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
61 62
          continue;
        }
63
        out[i] = (in[i] >= 0)? in[i] : -in[i];
64
      }
H
Haojun Liao 已提交
65 66
      break;
    }
67

H
Haojun Liao 已提交
68
    case TSDB_DATA_TYPE_SMALLINT: {
H
Haojun Liao 已提交
69 70
      int16_t *in  = (int16_t *)pInputData->pData;
      int16_t *out = (int16_t *)pOutputData->pData;
71
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
72
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
73
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
74 75
          continue;
        }
76
        out[i] = (in[i] >= 0)? in[i] : -in[i];
77
      }
H
Haojun Liao 已提交
78 79
      break;
    }
80

H
Haojun Liao 已提交
81
    case TSDB_DATA_TYPE_INT: {
H
Haojun Liao 已提交
82 83
      int32_t *in  = (int32_t *)pInputData->pData;
      int32_t *out = (int32_t *)pOutputData->pData;
84
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
85
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
86
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
87 88
          continue;
        }
89
        out[i] = (in[i] >= 0)? in[i] : -in[i];
90
      }
H
Haojun Liao 已提交
91 92
      break;
    }
93

H
Haojun Liao 已提交
94
    case TSDB_DATA_TYPE_BIGINT: {
H
Haojun Liao 已提交
95 96
      int64_t *in  = (int64_t *)pInputData->pData;
      int64_t *out = (int64_t *)pOutputData->pData;
97
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
98
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
99
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
100 101
          continue;
        }
102
        out[i] = (in[i] >= 0)? in[i] : -in[i];
103
      }
H
Haojun Liao 已提交
104
      break;
105 106
    }

H
Haojun Liao 已提交
107 108
    default: {
      colDataAssign(pOutputData, pInputData, pInput->numOfRows);
109 110
    }
  }
111

H
Haojun Liao 已提交
112
  pOutput->numOfRows = pInput->numOfRows;
113
  return TSDB_CODE_SUCCESS;
114 115
}

116
static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
H
Haojun Liao 已提交
117 118
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
119 120 121
    return TSDB_CODE_FAILED;
  }

122
  SColumnInfoData *pInputData = pInput->columnData;
H
Haojun Liao 已提交
123
  SColumnInfoData *pOutputData = pOutput->columnData;
124

125
  _getDoubleValue_fn_t getValueFn = getVectorDoubleValueFn(type);
126

127
  double *out = (double *)pOutputData->pData;
128

129 130 131 132
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
    if (colDataIsNull_f(pInputData->nullbitmap, i)) {
      colDataSetNull_f(pOutputData->nullbitmap, i);
      continue;
133
    }
134
    out[i] = valFn(getValueFn(pInputData->pData, i));
135
  }
136

H
Haojun Liao 已提交
137
  pOutput->numOfRows = pInput->numOfRows;
138 139 140
  return TSDB_CODE_SUCCESS;
}

141
static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
G
Ganlin Zhao 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
  if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData[2];
  SColumnInfoData *pOutputData = pOutput->columnData;
  _getDoubleValue_fn_t getValueFn[2];

  for (int32_t i = 0; i < inputNum; ++i) {
    pInputData[i] = pInput[i].columnData;
    getValueFn[i]= getVectorDoubleValueFn(GET_PARAM_TYPE(&pInput[i]));
  }

  double *out = (double *)pOutputData->pData;

  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
    if (colDataIsNull_f(pInputData[0]->nullbitmap, i) ||
        colDataIsNull_f(pInputData[1]->nullbitmap, 0)) {
      colDataSetNull_f(pOutputData->nullbitmap, i);
      continue;
    }
    out[i] = valFn(getValueFn[0](pInputData[0]->pData, i), getValueFn[1](pInputData[1]->pData, 0));
  }

  pOutput->numOfRows = pInput->numOfRows;
  return TSDB_CODE_SUCCESS;
}

170
static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) {
H
Haojun Liao 已提交
171 172
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
173 174 175
    return TSDB_CODE_FAILED;
  }

H
Haojun Liao 已提交
176 177
  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;
178

H
Haojun Liao 已提交
179 180 181 182
  switch (type) {
    case TSDB_DATA_TYPE_FLOAT: {
      float *in  = (float *)pInputData->pData;
      float *out = (float *)pOutputData->pData;
183

184
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
185
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
186
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
187 188
          continue;
        }
189
        out[i] = f1(in[i]);
190
      }
H
Haojun Liao 已提交
191 192
      break;
    }
193

H
Haojun Liao 已提交
194 195 196
    case TSDB_DATA_TYPE_DOUBLE: {
      double *in  = (double *)pInputData->pData;
      double *out = (double *)pOutputData->pData;
197

198
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
199
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
200
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
201 202
          continue;
        }
203
        out[i] = d1(in[i]);
204
      }
H
Haojun Liao 已提交
205 206 207 208 209
      break;
    }

    default: {
      colDataAssign(pOutputData, pInputData, pInput->numOfRows);
210
    }
211 212
  }

H
Haojun Liao 已提交
213
  pOutput->numOfRows = pInput->numOfRows;
214 215
  return TSDB_CODE_SUCCESS;
}
216

G
Ganlin Zhao 已提交
217
/** String functions **/
218
static int16_t tlength(char *input, int32_t type) {
G
Ganlin Zhao 已提交
219 220 221
  return varDataLen(input);
}

222
static int16_t tcharlength(char *input, int32_t type) {
G
Ganlin Zhao 已提交
223 224 225 226
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    return varDataLen(input);
  } else { //NCHAR
    return varDataLen(input) / TSDB_NCHAR_SIZE;
G
Ganlin Zhao 已提交
227
  }
G
Ganlin Zhao 已提交
228
}
G
Ganlin Zhao 已提交
229

230
static void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
G
Ganlin Zhao 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
  int32_t numOfSpaces = 0;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    for (int32_t i = 0; i < charLen; ++i) {
      if (!isspace(*(varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
    }
  } else { //NCHAR
    for (int32_t i = 0; i < charLen; ++i) {
      if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
    }
  }
G
Ganlin Zhao 已提交
247

G
Ganlin Zhao 已提交
248 249 250 251 252 253 254 255
  int32_t resLen;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    resLen = charLen - numOfSpaces;
    memcpy(varDataVal(output), varDataVal(input) + numOfSpaces, resLen);
  } else {
    resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
    memcpy(varDataVal(output), varDataVal(input) + numOfSpaces * TSDB_NCHAR_SIZE, resLen);
  }
G
Ganlin Zhao 已提交
256

G
Ganlin Zhao 已提交
257 258 259
  varDataSetLen(output, resLen);
}

260
static void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
G
Ganlin Zhao 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274
  int32_t numOfSpaces = 0;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    for (int32_t i = charLen - 1; i >= 0; --i) {
      if (!isspace(*(varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
    }
  } else { //NCHAR
    for (int32_t i = charLen - 1; i < charLen; ++i) {
      if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
G
Ganlin Zhao 已提交
275
    }
G
Ganlin Zhao 已提交
276
  }
G
Ganlin Zhao 已提交
277

G
Ganlin Zhao 已提交
278 279 280 281 282
  int32_t resLen;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    resLen = charLen - numOfSpaces;
  } else {
    resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
G
Ganlin Zhao 已提交
283
  }
G
Ganlin Zhao 已提交
284
  memcpy(varDataVal(output), varDataVal(input), resLen);
G
Ganlin Zhao 已提交
285

G
Ganlin Zhao 已提交
286
  varDataSetLen(output, resLen);
G
Ganlin Zhao 已提交
287 288
}

289
static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) {
G
Ganlin Zhao 已提交
290 291 292 293 294 295 296 297
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

G
Ganlin Zhao 已提交
298
  char *in = pInputData->pData;
G
Ganlin Zhao 已提交
299 300 301 302 303 304 305 306
  int16_t *out = (int16_t *)pOutputData->pData;

  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
    if (colDataIsNull_f(pInputData->nullbitmap, i)) {
      colDataSetNull_f(pOutputData->nullbitmap, i);
      continue;
    }

G
Ganlin Zhao 已提交
307 308
    out[i] = lenFn(in, type);
    in += varDataTLen(in);
G
Ganlin Zhao 已提交
309 310 311 312 313
  }

  pOutput->numOfRows = pInput->numOfRows;
  return TSDB_CODE_SUCCESS;
}
314

315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCol, int32_t type, int16_t *dataLen) {
  if (hasNcharCol && type == TSDB_DATA_TYPE_VARCHAR) {
    TdUcs4 *newBuf = taosMemoryCalloc((varDataLen(input) + 1) * TSDB_NCHAR_SIZE, 1);
    bool ret = taosMbsToUcs4(varDataVal(input), varDataLen(input), newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, NULL);
    if (!ret) {
      taosMemoryFree(newBuf);
      return TSDB_CODE_FAILED;
    }
    memcpy(varDataVal(output) + *dataLen, newBuf, varDataLen(input) * TSDB_NCHAR_SIZE);
    *dataLen += varDataLen(input) * TSDB_NCHAR_SIZE;
    taosMemoryFree(newBuf);
  } else {
    memcpy(varDataVal(output) + *dataLen, varDataVal(input), varDataLen(input));
    *dataLen += varDataLen(input);
  }
  return TSDB_CODE_SUCCESS;
331
}
G
Ganlin Zhao 已提交
332 333 334 335 336 337 338 339

int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
  SColumnInfoData *pOutputData = pOutput->columnData;
340
  char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
341
  char *outputBuf = NULL;
G
Ganlin Zhao 已提交
342

343
  int32_t inputLen = 0;
344
  int32_t numOfRows = 0;
345
  bool hasNcharCol = false;
346
  for (int32_t i = 0; i < inputNum; ++i) {
347 348
    int32_t type = GET_PARAM_TYPE(&pInput[i]);
    if (!IS_VAR_DATA_TYPE(type)) {
349 350
      return TSDB_CODE_FAILED;
    }
351 352 353
    if (type == TSDB_DATA_TYPE_NCHAR) {
      hasNcharCol = true;
    }
354 355 356 357
    if (pInput[i].numOfRows > numOfRows) {
      numOfRows = pInput[i].numOfRows;
    }
  }
G
Ganlin Zhao 已提交
358 359
  for (int32_t i = 0; i < inputNum; ++i) {
    pInputData[i] = pInput[i].columnData;
360
    input[i] = pInputData[i]->pData;
361 362 363 364
    int32_t factor = 1;
    if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
      factor = TSDB_NCHAR_SIZE;
    }
365
    if (pInput[i].numOfRows == 1) {
366
      inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * factor * numOfRows;
367 368 369
    } else {
      inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
    }
370 371
  }

372 373 374
  int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
  outputBuf = taosMemoryCalloc(outputLen, 1);
  char *output = outputBuf;
G
Ganlin Zhao 已提交
375 376

  bool hasNull = false;
377
  for (int32_t k = 0; k < numOfRows; ++k) {
G
Ganlin Zhao 已提交
378
    for (int32_t i = 0; i < inputNum; ++i) {
379 380
      if (colDataIsNull_s(pInputData[i], k)) {
        colDataAppendNULL(pOutputData, k);
G
Ganlin Zhao 已提交
381 382 383 384 385 386 387 388 389 390 391
        hasNull = true;
        break;
      }
    }

    if (hasNull) {
      continue;
    }

    int16_t dataLen = 0;
    for (int32_t i = 0; i < inputNum; ++i) {
392 393 394 395
      int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
396 397 398
      if (pInput[i].numOfRows != 1) {
        input[i] += varDataTLen(input[i]);
      }
G
Ganlin Zhao 已提交
399
    }
400
    varDataSetLen(output, dataLen);
401 402
    colDataAppend(pOutputData, k, output, false);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
403 404
  }

405
  pOutput->numOfRows = numOfRows;
406
  taosMemoryFree(input);
407
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
408 409 410 411 412
  taosMemoryFree(pInputData);

  return TSDB_CODE_SUCCESS;
}

413

G
Ganlin Zhao 已提交
414 415 416 417 418 419 420
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
  SColumnInfoData *pOutputData = pOutput->columnData;
421
  char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
422
  char *outputBuf = NULL;
G
Ganlin Zhao 已提交
423

424
  int32_t inputLen = 0;
425
  int32_t numOfRows = 0;
426
  bool hasNcharCol = false;
427
  for (int32_t i = 1; i < inputNum; ++i) {
428 429
    int32_t type = GET_PARAM_TYPE(&pInput[i]);
    if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i]))) {
430 431
      return TSDB_CODE_FAILED;
    }
432 433 434
    if (type == TSDB_DATA_TYPE_NCHAR) {
      hasNcharCol = true;
    }
435 436 437 438
    if (pInput[i].numOfRows > numOfRows) {
      numOfRows = pInput[i].numOfRows;
    }
  }
G
Ganlin Zhao 已提交
439 440
  for (int32_t i = 0; i < inputNum; ++i) {
    pInputData[i] = pInput[i].columnData;
441 442 443 444 445
    input[i] = pInputData[i]->pData;
    int32_t factor = 1;
    if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
      factor = TSDB_NCHAR_SIZE;
    }
446 447
    if (i == 0) {
      // calculate required separator space
448
      inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor;
449
    } else if (pInput[i].numOfRows == 1) {
450
      inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * factor;
451 452 453 454 455
    } else {
      inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
    }
  }

456 457 458
  int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
  outputBuf = taosMemoryCalloc(outputLen, 1);
  char *output = outputBuf;
G
Ganlin Zhao 已提交
459

460
  for (int32_t k = 0; k < numOfRows; ++k) {
461 462
    if (colDataIsNull_s(pInputData[0], k)) {
      colDataAppendNULL(pOutputData, k);
G
Ganlin Zhao 已提交
463 464 465 466 467
      continue;
    }

    int16_t dataLen = 0;
    for (int32_t i = 1; i < inputNum; ++i) {
468
      if (colDataIsNull_s(pInputData[i], k)) {
G
Ganlin Zhao 已提交
469 470 471
        continue;
      }

472 473 474 475 476
      int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }

477 478 479
      if (pInput[i].numOfRows != 1) {
        input[i] += varDataTLen(input[i]);
      }
G
Ganlin Zhao 已提交
480 481 482

      if (i < inputNum - 1) {
        //insert the separator
483
        char *sep = pInputData[0]->pData;
484 485 486 487
        int32_t ret = concatCopyHelper(sep, output, hasNcharCol, GET_PARAM_TYPE(&pInput[0]), &dataLen);
        if (ret != TSDB_CODE_SUCCESS) {
          return ret;
        }
G
Ganlin Zhao 已提交
488 489
      }
    }
490
    varDataSetLen(output, dataLen);
491 492
    colDataAppend(pOutputData, k, output, false);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
493 494
  }

495 496
  pOutput->numOfRows = numOfRows;
  taosMemoryFree(input);
497
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
498 499 500 501 502
  taosMemoryFree(pInputData);

  return TSDB_CODE_SUCCESS;
}

503
static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) {
G
Ganlin Zhao 已提交
504 505 506 507 508 509 510 511
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

512
  char *input  = pInputData->pData;
513 514 515 516 517
  char *output = NULL;

  int32_t outputLen = pInputData->varmeta.length;
  char *outputBuf = taosMemoryCalloc(outputLen, 1);
  output = outputBuf;
518

G
Ganlin Zhao 已提交
519
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
520 521
    if (colDataIsNull_s(pInputData, i)) {
      colDataAppendNULL(pOutputData, i);
G
Ganlin Zhao 已提交
522 523 524
      continue;
    }

525
    int32_t len = varDataLen(input);
G
Ganlin Zhao 已提交
526 527
    if (type == TSDB_DATA_TYPE_VARCHAR) {
      for (int32_t j = 0; j < len; ++j) {
528
        *(varDataVal(output) + j) = convFn(*(varDataVal(input) + j));
G
Ganlin Zhao 已提交
529 530 531
      }
    } else { //NCHAR
      for (int32_t j = 0; j < len / TSDB_NCHAR_SIZE; ++j) {
532
        *((uint32_t *)varDataVal(output) + j) = convFn(*((uint32_t *)varDataVal(input) + j));
G
Ganlin Zhao 已提交
533 534
      }
    }
535
    varDataSetLen(output, len);
536
    colDataAppend(pOutputData, i, output, false);
537 538
    input += varDataTLen(input);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
539 540 541
  }

  pOutput->numOfRows = pInput->numOfRows;
542
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
543 544 545 546 547

  return TSDB_CODE_SUCCESS;
}


548
static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) {
G
Ganlin Zhao 已提交
549 550 551 552 553 554 555 556
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

557
  char *input  = pInputData->pData;
558 559 560 561 562
  char *output = NULL;

  int32_t outputLen = pInputData->varmeta.length;
  char *outputBuf = taosMemoryCalloc(outputLen, 1);
  output = outputBuf;
563

G
Ganlin Zhao 已提交
564
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
565 566
    if (colDataIsNull_s(pInputData, i)) {
      colDataAppendNULL(pOutputData, i);
G
Ganlin Zhao 已提交
567 568 569
      continue;
    }

570
    int32_t len = varDataLen(input);
G
Ganlin Zhao 已提交
571
    int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
572 573 574
    trimFn(input, output, type, charLen);

    varDataSetLen(output, len);
575
    colDataAppend(pOutputData, i, output, false);
576 577
    input += varDataTLen(input);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
578 579 580
  }

  pOutput->numOfRows = pInput->numOfRows;
581
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
582 583 584 585 586

  return TSDB_CODE_SUCCESS;
}

int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
587
  if (inputNum != 2 && inputNum!= 3) {
G
Ganlin Zhao 已提交
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
    return TSDB_CODE_FAILED;
  }

  int32_t subPos = 0;
  GET_TYPED_DATA(subPos, int32_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
  if (subPos == 0) { //subPos needs to be positive or negative values;
    return TSDB_CODE_FAILED;
  }

  int32_t subLen = INT16_MAX;
  if (inputNum == 3) {
    GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
    if (subLen < 0) { //subLen cannot be negative
      return TSDB_CODE_FAILED;
    }
    subLen = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subLen : subLen * TSDB_NCHAR_SIZE;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

609
  char *input  = pInputData->pData;
610 611
  char *output = NULL;

612
  int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows;
613 614
  char *outputBuf = taosMemoryCalloc(outputLen, 1);
  output = outputBuf;
615 616

  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
617 618
    if (colDataIsNull_s(pInputData, i)) {
      colDataAppendNULL(pOutputData, i);
G
Ganlin Zhao 已提交
619 620 621
      continue;
    }

622
    int32_t len = varDataLen(input);
G
Ganlin Zhao 已提交
623 624 625 626 627 628 629 630 631 632
    int32_t startPosBytes;

    if (subPos > 0) {
      startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subPos - 1 : (subPos - 1) * TSDB_NCHAR_SIZE;
      startPosBytes = MIN(startPosBytes, len);
    } else {
      startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? len + subPos : len + subPos * TSDB_NCHAR_SIZE;
      startPosBytes = MAX(startPosBytes, 0);
    }

633 634 635
    int32_t resLen = MIN(subLen, len - startPosBytes);
    if (resLen > 0) {
      memcpy(varDataVal(output), varDataVal(input) + startPosBytes, resLen);
G
Ganlin Zhao 已提交
636 637
    }

638
    varDataSetLen(output, resLen);
639
    colDataAppend(pOutputData, i , output, false);
640 641
    input += varDataTLen(input);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
642 643 644
  }

  pOutput->numOfRows = pInput->numOfRows;
645
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
646 647 648 649 650

  return TSDB_CODE_SUCCESS;
}


651 652 653
int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, atan);
}
654

655 656 657
int32_t sinFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, sin);
}
658

659 660 661
int32_t cosFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, cos);
}
662

663 664 665
int32_t tanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, tan);
}
666

667 668 669
int32_t asinFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, asin);
}
670

671 672 673
int32_t acosFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, acos);
}
H
Haojun Liao 已提交
674

G
Ganlin Zhao 已提交
675 676 677 678 679 680 681 682
int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique2(pInput, inputNum, pOutput, pow);
}

int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique2(pInput, inputNum, pOutput, tlog);
}

683 684 685
int32_t sqrtFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, sqrt);
}
686

687 688 689 690 691 692 693 694 695 696
int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunction(pInput, inputNum, pOutput, ceilf, ceil);
}

int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunction(pInput, inputNum, pOutput, floorf, floor);
}

int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunction(pInput, inputNum, pOutput, roundf, round);
697 698
}

G
Ganlin Zhao 已提交
699 700
int32_t lowerFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doCaseConvFunction(pInput, inputNum, pOutput, tolower);
701 702
}

G
Ganlin Zhao 已提交
703 704
int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doCaseConvFunction(pInput, inputNum, pOutput, toupper);
705 706
}

G
Ganlin Zhao 已提交
707 708
int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doTrimFunction(pInput, inputNum, pOutput, tltrim);
709 710
}

G
Ganlin Zhao 已提交
711 712
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doTrimFunction(pInput, inputNum, pOutput, trtrim);
713 714
}

G
Ganlin Zhao 已提交
715 716 717 718 719 720 721 722
int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doLengthFunction(pInput, inputNum, pOutput, tlength);
}

int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doLengthFunction(pInput, inputNum, pOutput, tcharlength);
}

H
Haojun Liao 已提交
723
#if 0
724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
  switch(type) {
    case TSDB_DATA_TYPE_TINYINT:
    case TSDB_DATA_TYPE_UTINYINT:{
      int8_t* p = (int8_t*) dest;
      int8_t* pSrc = (int8_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }

    case TSDB_DATA_TYPE_SMALLINT:
    case TSDB_DATA_TYPE_USMALLINT:{
      int16_t* p = (int16_t*) dest;
      int16_t* pSrc = (int16_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_INT:
    case TSDB_DATA_TYPE_UINT: {
      int32_t* p = (int32_t*) dest;
      int32_t* pSrc = (int32_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_BIGINT:
    case TSDB_DATA_TYPE_UBIGINT: {
      int64_t* p = (int64_t*) dest;
      int64_t* pSrc = (int64_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_FLOAT: {
      float* p = (float*) dest;
      float* pSrc = (float*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_DOUBLE: {
      double* p = (double*) dest;
      double* pSrc = (double*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    default: assert(0);
  }
}
H
Haojun Liao 已提交
788
#endif
789

790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
}

int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1));
}

int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2));
}

int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 3));
  return TSDB_CODE_SUCCESS;
}

int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
  return TSDB_CODE_SUCCESS;
820
}