sclfunc.c 31.0 KB
Newer Older
H
Haojun Liao 已提交
1 2 3
#include "function.h"
#include "scalar.h"
#include "tdatablock.h"
H
Haojun Liao 已提交
4
#include "sclInt.h"
D
dapan1121 已提交
5
#include "sclvector.h"
6

G
Ganlin Zhao 已提交
7 8 9 10 11
typedef float (*_float_fn)(float);
typedef double (*_double_fn)(double);
typedef double (*_double_fn_2)(double, double);
typedef int (*_conv_fn)(int);
typedef void (*_trim_fn)(char *, char*, int32_t, int32_t);
G
Ganlin Zhao 已提交
12
typedef int16_t (*_len_fn)(char *, int32_t);
G
Ganlin Zhao 已提交
13

14
/** Math functions **/
15
static double tlog(double v, double base) {
G
Ganlin Zhao 已提交
16
  return log(v) / log(base);
17 18
}

19
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
H
Haojun Liao 已提交
20 21 22 23 24
  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

  int32_t type = GET_PARAM_TYPE(pInput);
  if (!IS_NUMERIC_TYPE(type)) {
25 26
    return TSDB_CODE_FAILED;
  }
27

H
Haojun Liao 已提交
28
  switch (type) {
29
    case TSDB_DATA_TYPE_FLOAT: {
H
Haojun Liao 已提交
30 31
      float *in  = (float *)pInputData->pData;
      float *out = (float *)pOutputData->pData;
32
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
33
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
34
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
35 36
          continue;
        }
37
        out[i] = (in[i] >= 0)? in[i] : -in[i];
38
      }
H
Haojun Liao 已提交
39
      break;
40 41 42
    }

    case TSDB_DATA_TYPE_DOUBLE: {
H
Haojun Liao 已提交
43 44
      double *in  = (double *)pInputData->pData;
      double *out = (double *)pOutputData->pData;
45
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
46
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
47
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
48 49
          continue;
        }
50
        out[i] = (in[i] >= 0)? in[i] : -in[i];
H
Haojun Liao 已提交
51 52
      }
      break;
53 54
    }

H
Haojun Liao 已提交
55
    case TSDB_DATA_TYPE_TINYINT: {
H
Haojun Liao 已提交
56 57
      int8_t *in  = (int8_t *)pInputData->pData;
      int8_t *out = (int8_t *)pOutputData->pData;
58
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
59
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
60
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
61 62
          continue;
        }
63
        out[i] = (in[i] >= 0)? in[i] : -in[i];
64
      }
H
Haojun Liao 已提交
65 66
      break;
    }
67

H
Haojun Liao 已提交
68
    case TSDB_DATA_TYPE_SMALLINT: {
H
Haojun Liao 已提交
69 70
      int16_t *in  = (int16_t *)pInputData->pData;
      int16_t *out = (int16_t *)pOutputData->pData;
71
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
72
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
73
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
74 75
          continue;
        }
76
        out[i] = (in[i] >= 0)? in[i] : -in[i];
77
      }
H
Haojun Liao 已提交
78 79
      break;
    }
80

H
Haojun Liao 已提交
81
    case TSDB_DATA_TYPE_INT: {
H
Haojun Liao 已提交
82 83
      int32_t *in  = (int32_t *)pInputData->pData;
      int32_t *out = (int32_t *)pOutputData->pData;
84
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
85
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
86
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
87 88
          continue;
        }
89
        out[i] = (in[i] >= 0)? in[i] : -in[i];
90
      }
H
Haojun Liao 已提交
91 92
      break;
    }
93

H
Haojun Liao 已提交
94
    case TSDB_DATA_TYPE_BIGINT: {
H
Haojun Liao 已提交
95 96
      int64_t *in  = (int64_t *)pInputData->pData;
      int64_t *out = (int64_t *)pOutputData->pData;
97
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
98
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
99
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
100 101
          continue;
        }
102
        out[i] = (in[i] >= 0)? in[i] : -in[i];
103
      }
H
Haojun Liao 已提交
104
      break;
105 106
    }

H
Haojun Liao 已提交
107 108
    default: {
      colDataAssign(pOutputData, pInputData, pInput->numOfRows);
109 110
    }
  }
111

H
Haojun Liao 已提交
112
  pOutput->numOfRows = pInput->numOfRows;
113
  return TSDB_CODE_SUCCESS;
114 115
}

116
static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
H
Haojun Liao 已提交
117 118
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
119 120
    return TSDB_CODE_FAILED;
  }
121

122
  SColumnInfoData *pInputData = pInput->columnData;
H
Haojun Liao 已提交
123
  SColumnInfoData *pOutputData = pOutput->columnData;
124

125
  _getDoubleValue_fn_t getValueFn = getVectorDoubleValueFn(type);
126

127
  double *out = (double *)pOutputData->pData;
128

129 130 131
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
    if (colDataIsNull_f(pInputData->nullbitmap, i)) {
      colDataSetNull_f(pOutputData->nullbitmap, i);
132
      continue;
133
    }
134
    out[i] = valFn(getValueFn(pInputData->pData, i));
135
  }
136

H
Haojun Liao 已提交
137
  pOutput->numOfRows = pInput->numOfRows;
138
  return TSDB_CODE_SUCCESS;
139 140
}

141
static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
G
Ganlin Zhao 已提交
142
  if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
143 144 145
    return TSDB_CODE_FAILED;
  }

G
Ganlin Zhao 已提交
146
  SColumnInfoData *pInputData[2];
H
Haojun Liao 已提交
147
  SColumnInfoData *pOutputData = pOutput->columnData;
G
Ganlin Zhao 已提交
148
  _getDoubleValue_fn_t getValueFn[2];
149

G
Ganlin Zhao 已提交
150 151 152 153
  for (int32_t i = 0; i < inputNum; ++i) {
    pInputData[i] = pInput[i].columnData;
    getValueFn[i]= getVectorDoubleValueFn(GET_PARAM_TYPE(&pInput[i]));
  }
154

155
  double *out = (double *)pOutputData->pData;
156

157
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
G
Ganlin Zhao 已提交
158 159
    if (colDataIsNull_f(pInputData[0]->nullbitmap, i) ||
        colDataIsNull_f(pInputData[1]->nullbitmap, 0)) {
160 161
      colDataSetNull_f(pOutputData->nullbitmap, i);
      continue;
162
    }
G
Ganlin Zhao 已提交
163
    out[i] = valFn(getValueFn[0](pInputData[0]->pData, i), getValueFn[1](pInputData[1]->pData, 0));
164
  }
165

H
Haojun Liao 已提交
166
  pOutput->numOfRows = pInput->numOfRows;
167 168 169
  return TSDB_CODE_SUCCESS;
}

170
static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) {
H
Haojun Liao 已提交
171 172
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
173 174 175
    return TSDB_CODE_FAILED;
  }

H
Haojun Liao 已提交
176 177
  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;
178

H
Haojun Liao 已提交
179 180 181 182
  switch (type) {
    case TSDB_DATA_TYPE_FLOAT: {
      float *in  = (float *)pInputData->pData;
      float *out = (float *)pOutputData->pData;
183

184
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
185
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
186
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
187 188
          continue;
        }
189
        out[i] = f1(in[i]);
190
      }
H
Haojun Liao 已提交
191 192
      break;
    }
193

H
Haojun Liao 已提交
194 195 196
    case TSDB_DATA_TYPE_DOUBLE: {
      double *in  = (double *)pInputData->pData;
      double *out = (double *)pOutputData->pData;
197

198
      for (int32_t i = 0; i < pInput->numOfRows; ++i) {
H
Haojun Liao 已提交
199
        if (colDataIsNull_f(pInputData->nullbitmap, i)) {
200
          colDataSetNull_f(pOutputData->nullbitmap, i);
H
Haojun Liao 已提交
201 202
          continue;
        }
203
        out[i] = d1(in[i]);
204
      }
H
Haojun Liao 已提交
205 206 207 208 209
      break;
    }

    default: {
      colDataAssign(pOutputData, pInputData, pInput->numOfRows);
210
    }
211 212
  }

H
Haojun Liao 已提交
213
  pOutput->numOfRows = pInput->numOfRows;
214 215
  return TSDB_CODE_SUCCESS;
}
216

G
Ganlin Zhao 已提交
217
/** String functions **/
218
static int16_t tlength(char *input, int32_t type) {
G
Ganlin Zhao 已提交
219 220 221
  return varDataLen(input);
}

222
static int16_t tcharlength(char *input, int32_t type) {
G
Ganlin Zhao 已提交
223 224 225 226
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    return varDataLen(input);
  } else { //NCHAR
    return varDataLen(input) / TSDB_NCHAR_SIZE;
G
Ganlin Zhao 已提交
227
  }
G
Ganlin Zhao 已提交
228
}
G
Ganlin Zhao 已提交
229

230
static void tltrim(char *input, char *output, int32_t type, int32_t charLen) {
G
Ganlin Zhao 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
  int32_t numOfSpaces = 0;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    for (int32_t i = 0; i < charLen; ++i) {
      if (!isspace(*(varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
    }
  } else { //NCHAR
    for (int32_t i = 0; i < charLen; ++i) {
      if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
    }
  }
G
Ganlin Zhao 已提交
247

G
Ganlin Zhao 已提交
248 249 250 251 252 253 254 255
  int32_t resLen;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    resLen = charLen - numOfSpaces;
    memcpy(varDataVal(output), varDataVal(input) + numOfSpaces, resLen);
  } else {
    resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
    memcpy(varDataVal(output), varDataVal(input) + numOfSpaces * TSDB_NCHAR_SIZE, resLen);
  }
G
Ganlin Zhao 已提交
256

G
Ganlin Zhao 已提交
257 258 259
  varDataSetLen(output, resLen);
}

260
static void trtrim(char *input, char *output, int32_t type, int32_t charLen) {
G
Ganlin Zhao 已提交
261 262 263 264 265 266 267 268 269
  int32_t numOfSpaces = 0;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    for (int32_t i = charLen - 1; i >= 0; --i) {
      if (!isspace(*(varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
    }
  } else { //NCHAR
270
    for (int32_t i = charLen - 1; i >= 0; --i) {
G
Ganlin Zhao 已提交
271 272 273 274
      if (!iswspace(*((uint32_t *)varDataVal(input) + i))) {
        break;
      }
      numOfSpaces++;
G
Ganlin Zhao 已提交
275
    }
G
Ganlin Zhao 已提交
276
  }
G
Ganlin Zhao 已提交
277

G
Ganlin Zhao 已提交
278 279 280 281 282
  int32_t resLen;
  if (type == TSDB_DATA_TYPE_VARCHAR) {
    resLen = charLen - numOfSpaces;
  } else {
    resLen = (charLen - numOfSpaces) * TSDB_NCHAR_SIZE;
G
Ganlin Zhao 已提交
283
  }
G
Ganlin Zhao 已提交
284
  memcpy(varDataVal(output), varDataVal(input), resLen);
G
Ganlin Zhao 已提交
285

G
Ganlin Zhao 已提交
286
  varDataSetLen(output, resLen);
G
Ganlin Zhao 已提交
287 288
}

289
static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _len_fn lenFn) {
G
Ganlin Zhao 已提交
290 291 292 293 294 295 296 297
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

298
  char *in = pInputData->pData + pInputData->varmeta.offset[0];
G
Ganlin Zhao 已提交
299 300 301
  int16_t *out = (int16_t *)pOutputData->pData;

  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
302
    if (colDataIsNull_s(pInputData, i)) {
G
Ganlin Zhao 已提交
303 304 305 306
      colDataSetNull_f(pOutputData->nullbitmap, i);
      continue;
    }

G
Ganlin Zhao 已提交
307 308
    out[i] = lenFn(in, type);
    in += varDataTLen(in);
G
Ganlin Zhao 已提交
309 310 311 312 313
  }

  pOutput->numOfRows = pInput->numOfRows;
  return TSDB_CODE_SUCCESS;
}
314

315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCol, int32_t type, int16_t *dataLen) {
  if (hasNcharCol && type == TSDB_DATA_TYPE_VARCHAR) {
    TdUcs4 *newBuf = taosMemoryCalloc((varDataLen(input) + 1) * TSDB_NCHAR_SIZE, 1);
    bool ret = taosMbsToUcs4(varDataVal(input), varDataLen(input), newBuf, (varDataLen(input) + 1) * TSDB_NCHAR_SIZE, NULL);
    if (!ret) {
      taosMemoryFree(newBuf);
      return TSDB_CODE_FAILED;
    }
    memcpy(varDataVal(output) + *dataLen, newBuf, varDataLen(input) * TSDB_NCHAR_SIZE);
    *dataLen += varDataLen(input) * TSDB_NCHAR_SIZE;
    taosMemoryFree(newBuf);
  } else {
    memcpy(varDataVal(output) + *dataLen, varDataVal(input), varDataLen(input));
    *dataLen += varDataLen(input);
  }
  return TSDB_CODE_SUCCESS;
331
}
G
Ganlin Zhao 已提交
332 333 334 335 336 337 338 339

int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  if (inputNum < 2 || inputNum > 8) { // concat accpet 2-8 input strings
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
  SColumnInfoData *pOutputData = pOutput->columnData;
340
  char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
341
  char *outputBuf = NULL;
G
Ganlin Zhao 已提交
342

343
  int32_t inputLen = 0;
344
  int32_t numOfRows = 0;
345
  bool hasNcharCol = false;
346
  for (int32_t i = 0; i < inputNum; ++i) {
347 348
    int32_t type = GET_PARAM_TYPE(&pInput[i]);
    if (!IS_VAR_DATA_TYPE(type)) {
349 350
      return TSDB_CODE_FAILED;
    }
351 352 353
    if (type == TSDB_DATA_TYPE_NCHAR) {
      hasNcharCol = true;
    }
354 355 356 357
    if (pInput[i].numOfRows > numOfRows) {
      numOfRows = pInput[i].numOfRows;
    }
  }
G
Ganlin Zhao 已提交
358 359
  for (int32_t i = 0; i < inputNum; ++i) {
    pInputData[i] = pInput[i].columnData;
360
    input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0];
361 362 363 364
    int32_t factor = 1;
    if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
      factor = TSDB_NCHAR_SIZE;
    }
365
    if (pInput[i].numOfRows == 1) {
366
      inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * factor * numOfRows;
367 368 369
    } else {
      inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
    }
370 371
  }

372 373 374
  int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
  outputBuf = taosMemoryCalloc(outputLen, 1);
  char *output = outputBuf;
G
Ganlin Zhao 已提交
375 376

  bool hasNull = false;
377
  for (int32_t k = 0; k < numOfRows; ++k) {
G
Ganlin Zhao 已提交
378
    for (int32_t i = 0; i < inputNum; ++i) {
379 380
      if (colDataIsNull_s(pInputData[i], k)) {
        colDataAppendNULL(pOutputData, k);
G
Ganlin Zhao 已提交
381 382 383 384 385 386 387 388 389 390 391
        hasNull = true;
        break;
      }
    }

    if (hasNull) {
      continue;
    }

    int16_t dataLen = 0;
    for (int32_t i = 0; i < inputNum; ++i) {
392 393 394 395
      int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
396 397 398
      if (pInput[i].numOfRows != 1) {
        input[i] += varDataTLen(input[i]);
      }
G
Ganlin Zhao 已提交
399
    }
400
    varDataSetLen(output, dataLen);
401 402
    colDataAppend(pOutputData, k, output, false);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
403 404
  }

405
  pOutput->numOfRows = numOfRows;
406
  taosMemoryFree(input);
407
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
408 409 410 411 412
  taosMemoryFree(pInputData);

  return TSDB_CODE_SUCCESS;
}

413

G
Ganlin Zhao 已提交
414 415 416 417 418 419 420
int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  if (inputNum < 3 || inputNum > 9) { // concat accpet 3-9 input strings including the separator
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData **pInputData = taosMemoryCalloc(inputNum, sizeof(SColumnInfoData *));
  SColumnInfoData *pOutputData = pOutput->columnData;
421
  char **input = taosMemoryCalloc(inputNum, POINTER_BYTES);
422
  char *outputBuf = NULL;
G
Ganlin Zhao 已提交
423

424
  int32_t inputLen = 0;
425
  int32_t numOfRows = 0;
426
  bool hasNcharCol = false;
427
  for (int32_t i = 1; i < inputNum; ++i) {
428 429
    int32_t type = GET_PARAM_TYPE(&pInput[i]);
    if (!IS_VAR_DATA_TYPE(GET_PARAM_TYPE(&pInput[i]))) {
430 431
      return TSDB_CODE_FAILED;
    }
432 433 434
    if (type == TSDB_DATA_TYPE_NCHAR) {
      hasNcharCol = true;
    }
435 436 437 438
    if (pInput[i].numOfRows > numOfRows) {
      numOfRows = pInput[i].numOfRows;
    }
  }
G
Ganlin Zhao 已提交
439 440
  for (int32_t i = 0; i < inputNum; ++i) {
    pInputData[i] = pInput[i].columnData;
441
    input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0];
442 443 444 445
    int32_t factor = 1;
    if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) {
      factor = TSDB_NCHAR_SIZE;
    }
446 447
    if (i == 0) {
      // calculate required separator space
448
      inputLen += (pInputData[0]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * (inputNum - 2) * factor;
449
    } else if (pInput[i].numOfRows == 1) {
450
      inputLen += (pInputData[i]->varmeta.length - VARSTR_HEADER_SIZE) * numOfRows * factor;
451 452 453 454 455
    } else {
      inputLen += pInputData[i]->varmeta.length - numOfRows * VARSTR_HEADER_SIZE;
    }
  }

456 457 458
  int32_t outputLen = inputLen + numOfRows * VARSTR_HEADER_SIZE;
  outputBuf = taosMemoryCalloc(outputLen, 1);
  char *output = outputBuf;
G
Ganlin Zhao 已提交
459

460
  for (int32_t k = 0; k < numOfRows; ++k) {
461 462
    if (colDataIsNull_s(pInputData[0], k)) {
      colDataAppendNULL(pOutputData, k);
G
Ganlin Zhao 已提交
463 464 465 466 467
      continue;
    }

    int16_t dataLen = 0;
    for (int32_t i = 1; i < inputNum; ++i) {
468
      if (colDataIsNull_s(pInputData[i], k)) {
G
Ganlin Zhao 已提交
469 470 471
        continue;
      }

472 473 474 475 476
      int32_t ret = concatCopyHelper(input[i], output, hasNcharCol, GET_PARAM_TYPE(&pInput[i]), &dataLen);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }

477 478 479
      if (pInput[i].numOfRows != 1) {
        input[i] += varDataTLen(input[i]);
      }
G
Ganlin Zhao 已提交
480 481 482

      if (i < inputNum - 1) {
        //insert the separator
483
        char *sep = pInputData[0]->pData;
484 485 486 487
        int32_t ret = concatCopyHelper(sep, output, hasNcharCol, GET_PARAM_TYPE(&pInput[0]), &dataLen);
        if (ret != TSDB_CODE_SUCCESS) {
          return ret;
        }
G
Ganlin Zhao 已提交
488 489
      }
    }
490
    varDataSetLen(output, dataLen);
491 492
    colDataAppend(pOutputData, k, output, false);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
493 494
  }

495 496
  pOutput->numOfRows = numOfRows;
  taosMemoryFree(input);
497
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
498 499 500 501 502
  taosMemoryFree(pInputData);

  return TSDB_CODE_SUCCESS;
}

503
static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _conv_fn convFn) {
G
Ganlin Zhao 已提交
504 505 506 507 508 509 510 511
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

512
  char *input  = pInputData->pData + pInputData->varmeta.offset[0];
513 514 515 516 517
  char *output = NULL;

  int32_t outputLen = pInputData->varmeta.length;
  char *outputBuf = taosMemoryCalloc(outputLen, 1);
  output = outputBuf;
518

G
Ganlin Zhao 已提交
519
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
520 521
    if (colDataIsNull_s(pInputData, i)) {
      colDataAppendNULL(pOutputData, i);
G
Ganlin Zhao 已提交
522 523 524
      continue;
    }

525
    int32_t len = varDataLen(input);
G
Ganlin Zhao 已提交
526 527
    if (type == TSDB_DATA_TYPE_VARCHAR) {
      for (int32_t j = 0; j < len; ++j) {
528
        *(varDataVal(output) + j) = convFn(*(varDataVal(input) + j));
G
Ganlin Zhao 已提交
529 530 531
      }
    } else { //NCHAR
      for (int32_t j = 0; j < len / TSDB_NCHAR_SIZE; ++j) {
532
        *((uint32_t *)varDataVal(output) + j) = convFn(*((uint32_t *)varDataVal(input) + j));
G
Ganlin Zhao 已提交
533 534
      }
    }
535
    varDataSetLen(output, len);
536
    colDataAppend(pOutputData, i, output, false);
537 538
    input += varDataTLen(input);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
539 540 541
  }

  pOutput->numOfRows = pInput->numOfRows;
542
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
543 544 545 546 547

  return TSDB_CODE_SUCCESS;
}


548
static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, _trim_fn trimFn) {
G
Ganlin Zhao 已提交
549 550 551 552 553 554 555 556
  int32_t type = GET_PARAM_TYPE(pInput);
  if (inputNum != 1 || !IS_VAR_DATA_TYPE(type)) {
    return TSDB_CODE_FAILED;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

557
  char *input  = pInputData->pData + pInputData->varmeta.offset[0];
558 559 560 561 562
  char *output = NULL;

  int32_t outputLen = pInputData->varmeta.length;
  char *outputBuf = taosMemoryCalloc(outputLen, 1);
  output = outputBuf;
563

G
Ganlin Zhao 已提交
564
  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
565 566
    if (colDataIsNull_s(pInputData, i)) {
      colDataAppendNULL(pOutputData, i);
G
Ganlin Zhao 已提交
567 568 569
      continue;
    }

570
    int32_t len = varDataLen(input);
G
Ganlin Zhao 已提交
571
    int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
572 573 574
    trimFn(input, output, type, charLen);

    varDataSetLen(output, len);
575
    colDataAppend(pOutputData, i, output, false);
576 577
    input += varDataTLen(input);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
578 579 580
  }

  pOutput->numOfRows = pInput->numOfRows;
581
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
582 583 584 585 586

  return TSDB_CODE_SUCCESS;
}

int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
587
  if (inputNum != 2 && inputNum!= 3) {
G
Ganlin Zhao 已提交
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
    return TSDB_CODE_FAILED;
  }

  int32_t subPos = 0;
  GET_TYPED_DATA(subPos, int32_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
  if (subPos == 0) { //subPos needs to be positive or negative values;
    return TSDB_CODE_FAILED;
  }

  int32_t subLen = INT16_MAX;
  if (inputNum == 3) {
    GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
    if (subLen < 0) { //subLen cannot be negative
      return TSDB_CODE_FAILED;
    }
    subLen = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subLen : subLen * TSDB_NCHAR_SIZE;
  }

  SColumnInfoData *pInputData  = pInput->columnData;
  SColumnInfoData *pOutputData = pOutput->columnData;

609
  char *input  = pInputData->pData + pInputData->varmeta.offset[0];
610 611
  char *output = NULL;

612
  int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows;
613 614
  char *outputBuf = taosMemoryCalloc(outputLen, 1);
  output = outputBuf;
615 616

  for (int32_t i = 0; i < pInput->numOfRows; ++i) {
617 618
    if (colDataIsNull_s(pInputData, i)) {
      colDataAppendNULL(pOutputData, i);
G
Ganlin Zhao 已提交
619 620 621
      continue;
    }

622
    int32_t len = varDataLen(input);
G
Ganlin Zhao 已提交
623 624 625 626 627 628 629 630 631 632
    int32_t startPosBytes;

    if (subPos > 0) {
      startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? subPos - 1 : (subPos - 1) * TSDB_NCHAR_SIZE;
      startPosBytes = MIN(startPosBytes, len);
    } else {
      startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? len + subPos : len + subPos * TSDB_NCHAR_SIZE;
      startPosBytes = MAX(startPosBytes, 0);
    }

633 634 635
    int32_t resLen = MIN(subLen, len - startPosBytes);
    if (resLen > 0) {
      memcpy(varDataVal(output), varDataVal(input) + startPosBytes, resLen);
G
Ganlin Zhao 已提交
636 637
    }

638
    varDataSetLen(output, resLen);
639
    colDataAppend(pOutputData, i , output, false);
640 641
    input += varDataTLen(input);
    output += varDataTLen(output);
G
Ganlin Zhao 已提交
642 643 644
  }

  pOutput->numOfRows = pInput->numOfRows;
645
  taosMemoryFree(outputBuf);
G
Ganlin Zhao 已提交
646 647 648 649

  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  if (inputNum!= 3) {
    return TSDB_CODE_FAILED;
  }

  int16_t inputType  = pInput[0].columnData->info.type;
  int16_t outputType = *(int16_t *)pInput[1].columnData->pData;
  if (outputType != TSDB_DATA_TYPE_BIGINT && outputType != TSDB_DATA_TYPE_UBIGINT &&
      outputType != TSDB_DATA_TYPE_VARCHAR && outputType != TSDB_DATA_TYPE_NCHAR &&
      outputType != TSDB_DATA_TYPE_TIMESTAMP) {
    return TSDB_CODE_FAILED;
  }
  int64_t outputLen = *(int64_t *)pInput[2].columnData->pData;

  char *input = NULL;
  char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1);
  char *output = outputBuf;
  if (IS_VAR_DATA_TYPE(inputType)) {
    input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[0];
  } else {
    input = pInput[0].columnData->pData;
  }

  for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
    if (colDataIsNull_s(pInput[0].columnData, i)) {
      colDataAppendNULL(pOutput->columnData, i);
      continue;
    }

    switch(outputType) {
      case TSDB_DATA_TYPE_BIGINT: {
        if (inputType == TSDB_DATA_TYPE_BINARY) {
          memcpy(output, varDataVal(input), varDataLen(input));
          *(int64_t *)output = strtoll(output, NULL, 10);
        } else if (inputType == TSDB_DATA_TYPE_NCHAR) {
          char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1);
          int32_t len  = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
          if (len < 0) {
            taosMemoryFree(newBuf);
            return TSDB_CODE_FAILED;
          }
          newBuf[len] = 0;
          *(int64_t *)output = strtoll(newBuf, NULL, 10);
          taosMemoryFree(newBuf);
        } else {
          GET_TYPED_DATA(*(int64_t *)output, int64_t, inputType, input);
        }
        break;
      }
      case TSDB_DATA_TYPE_UBIGINT: {
        if (inputType == TSDB_DATA_TYPE_BINARY) {
          memcpy(output, varDataVal(input), varDataLen(input));
          *(uint64_t *)output = strtoull(output, NULL, 10);
        } else if (inputType == TSDB_DATA_TYPE_NCHAR) {
          char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1);
          int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
          if (len < 0) {
            taosMemoryFree(newBuf);
            return TSDB_CODE_FAILED;
          }
          newBuf[len] = 0;
          *(uint64_t *)output = strtoull(newBuf, NULL, 10);
          taosMemoryFree(newBuf);
        } else {
          GET_TYPED_DATA(*(uint64_t *)output, uint64_t, inputType, input);
        }
        break;
      }
      case TSDB_DATA_TYPE_TIMESTAMP: {
        if (inputType == TSDB_DATA_TYPE_BINARY || inputType == TSDB_DATA_TYPE_NCHAR) {
          //not support
          return TSDB_CODE_FAILED;
        } else {
          GET_TYPED_DATA(*(int64_t *)output, int64_t, inputType, input);
        }
        break;
      }
      case TSDB_DATA_TYPE_BINARY: {
        if (inputType == TSDB_DATA_TYPE_BOOL) {
          int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), *(int8_t *)input ? "true" : "false");
          varDataSetLen(output, len);
        } else if (inputType == TSDB_DATA_TYPE_BINARY) {
          int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), varDataVal(input));
          varDataSetLen(output, len);
        } else if (inputType == TSDB_DATA_TYPE_BINARY || inputType == TSDB_DATA_TYPE_NCHAR) {
          //not support
          return TSDB_CODE_FAILED;
        } else {
          char tmp[400] = {0};
          NUM_TO_STRING(inputType, input, sizeof(tmp), tmp);
          int32_t len = (int32_t)strlen(tmp);
          len = (outputLen - VARSTR_HEADER_SIZE) > len ? len : (outputLen - VARSTR_HEADER_SIZE);
          memcpy(varDataVal(output), tmp, len);
          varDataSetLen(output, len);
        }
        break;
      }
      case TSDB_DATA_TYPE_NCHAR: {
        int32_t outputCharLen = (outputLen - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
        if (inputType == TSDB_DATA_TYPE_BOOL) {
          char tmp[8] = {0};
          int32_t len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" );
          bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
          if (!ret) {
            return TSDB_CODE_FAILED;
          }
          varDataSetLen(output, len);
        } else if (inputType == TSDB_DATA_TYPE_BINARY) {
          int32_t len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen;
          bool ret = taosMbsToUcs4(input + VARSTR_HEADER_SIZE, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
          if (!ret) {
            return TSDB_CODE_FAILED;
          }
          varDataSetLen(output, len);
        } else if (inputType == TSDB_DATA_TYPE_NCHAR) {
          int32_t len = MIN(outputLen, varDataLen(input) + VARSTR_HEADER_SIZE);
          memcpy(output, input, len);
          varDataSetLen(output, len - VARSTR_HEADER_SIZE);
        } else {
          char tmp[400] = {0};
          NUM_TO_STRING(inputType, input, sizeof(tmp), tmp);
          int32_t len = (int32_t)strlen(tmp);
          len = outputCharLen > len ? len : outputCharLen;
          bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
          if (!ret) {
            return TSDB_CODE_FAILED;
          }
          varDataSetLen(output, len);
        }
        break;
      }
      default: {
        return TSDB_CODE_FAILED;
      }
    }

    colDataAppend(pOutput->columnData, i, output, false);
    if (IS_VAR_DATA_TYPE(inputType)) {
      input  += varDataTLen(input);
    } else {
      input  += tDataTypes[inputType].bytes;
    }
    if (IS_VAR_DATA_TYPE(outputType)) {
      output += varDataTLen(output);
    } else {
      output += tDataTypes[outputType].bytes;
    }
  }

  pOutput->numOfRows = pInput->numOfRows;
  taosMemoryFree(outputBuf);
  return TSDB_CODE_SUCCESS;
}
G
Ganlin Zhao 已提交
803

804 805 806
int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, atan);
}
807

808 809 810
int32_t sinFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, sin);
}
811

812 813 814
int32_t cosFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, cos);
}
815

816 817 818
int32_t tanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, tan);
}
819

820 821 822
int32_t asinFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, asin);
}
823

824 825 826
int32_t acosFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, acos);
}
H
Haojun Liao 已提交
827

G
Ganlin Zhao 已提交
828 829 830 831 832 833 834 835
int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique2(pInput, inputNum, pOutput, pow);
}

int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique2(pInput, inputNum, pOutput, tlog);
}

836 837 838
int32_t sqrtFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunctionUnique(pInput, inputNum, pOutput, sqrt);
}
839

840 841 842 843 844 845 846 847 848 849
int32_t ceilFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunction(pInput, inputNum, pOutput, ceilf, ceil);
}

int32_t floorFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunction(pInput, inputNum, pOutput, floorf, floor);
}

int32_t roundFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doScalarFunction(pInput, inputNum, pOutput, roundf, round);
850 851
}

G
Ganlin Zhao 已提交
852 853
int32_t lowerFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doCaseConvFunction(pInput, inputNum, pOutput, tolower);
854 855
}

G
Ganlin Zhao 已提交
856 857
int32_t upperFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doCaseConvFunction(pInput, inputNum, pOutput, toupper);
858 859
}

G
Ganlin Zhao 已提交
860 861
int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doTrimFunction(pInput, inputNum, pOutput, tltrim);
862 863
}

G
Ganlin Zhao 已提交
864 865
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doTrimFunction(pInput, inputNum, pOutput, trtrim);
866 867
}

G
Ganlin Zhao 已提交
868 869 870
int32_t lengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doLengthFunction(pInput, inputNum, pOutput, tlength);
}
871

G
Ganlin Zhao 已提交
872 873
int32_t charLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  return doLengthFunction(pInput, inputNum, pOutput, tcharlength);
874 875
}

H
Haojun Liao 已提交
876
#if 0
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940
static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) {
  switch(type) {
    case TSDB_DATA_TYPE_TINYINT:
    case TSDB_DATA_TYPE_UTINYINT:{
      int8_t* p = (int8_t*) dest;
      int8_t* pSrc = (int8_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }

    case TSDB_DATA_TYPE_SMALLINT:
    case TSDB_DATA_TYPE_USMALLINT:{
      int16_t* p = (int16_t*) dest;
      int16_t* pSrc = (int16_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_INT:
    case TSDB_DATA_TYPE_UINT: {
      int32_t* p = (int32_t*) dest;
      int32_t* pSrc = (int32_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_BIGINT:
    case TSDB_DATA_TYPE_UBIGINT: {
      int64_t* p = (int64_t*) dest;
      int64_t* pSrc = (int64_t*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_FLOAT: {
      float* p = (float*) dest;
      float* pSrc = (float*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    case TSDB_DATA_TYPE_DOUBLE: {
      double* p = (double*) dest;
      double* pSrc = (double*) src;

      for(int32_t i = 0; i < numOfRows; ++i) {
        p[i] = pSrc[numOfRows - i - 1];
      }
      return;
    }
    default: assert(0);
  }
}
H
Haojun Liao 已提交
941
#endif
942

943 944 945 946 947 948 949 950
bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
  pEnv->calcMemSize = sizeof(int64_t);
  return true;
}

int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
951
  return TSDB_CODE_SUCCESS;
952 953 954 955 956
}

int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1));
957
  return TSDB_CODE_SUCCESS;
958 959 960 961 962
}

int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2));
963
  return TSDB_CODE_SUCCESS;
964 965 966 967 968 969 970 971 972 973 974 975
}

int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 3));
  return TSDB_CODE_SUCCESS;
}

int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
  ASSERT(inputNum == 1);
  colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
  return TSDB_CODE_SUCCESS;
976
}