scalar.c 49.7 KB
Newer Older
H
Hongze Cheng 已提交
1
#include "scalar.h"
D
dapan1121 已提交
2 3
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
4 5
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
6
#include "sclInt.h"
H
Haojun Liao 已提交
7 8
#include "sclvector.h"
#include "tcommon.h"
H
Hongze Cheng 已提交
9
#include "tcompare.h"
H
Haojun Liao 已提交
10
#include "tdatablock.h"
D
fix bug  
dapan1121 已提交
11
#include "ttime.h"
H
Hongze Cheng 已提交
12
#include "tudf.h"
D
dapan1121 已提交
13

D
dapan1121 已提交
14
int32_t scalarGetOperatorParamNum(EOperatorType type) {
H
Hongze Cheng 已提交
15 16 17
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type ||
      OP_TYPE_IS_NOT_TRUE == type || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type ||
      OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type || OP_TYPE_MINUS == type) {
D
dapan1121 已提交
18 19 20 21 22 23
    return 1;
  }

  return 2;
}

H
Hongze Cheng 已提交
24 25 26 27
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode *valueNode) {
  char   *timeStr = valueNode->datum.p;
  int32_t code =
      convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i);
D
dapan1121 已提交
28 29
  if (code != TSDB_CODE_SUCCESS) {
    return code;
D
dapan 已提交
30 31
  }
  taosMemoryFree(timeStr);
D
dapan1121 已提交
32
  valueNode->typeData = valueNode->datum.i;
G
Ganlin Zhao 已提交
33

D
dapan 已提交
34 35
  valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
  valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
D
dapan1121 已提交
36 37

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
38 39
}

H
Hongze Cheng 已提交
40 41
int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarParam *pParam) {
  SColumnInfoData *pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
42 43
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
44
    return terrno;
45 46
  }

H
Hongze Cheng 已提交
47 48 49
  pColumnData->info.type = pType->type;
  pColumnData->info.bytes = pType->bytes;
  pColumnData->info.scale = pType->scale;
50 51
  pColumnData->info.precision = pType->precision;

52
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
53 54
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
55
    taosMemoryFree(pColumnData);
H
Haojun Liao 已提交
56
    return terrno;
57
  }
H
Haojun Liao 已提交
58 59

  pParam->columnData = pColumnData;
D
dapan1121 已提交
60
  pParam->colAlloced = true;
H
Haojun Liao 已提交
61
  return TSDB_CODE_SUCCESS;
62 63
}

D
dapan1121 已提交
64
int32_t sclConvertValueToSclParam(SValueNode* pValueNode, SScalarParam* out, int32_t* overflow) {
65
  SScalarParam in = {.numOfRows = 1};
H
Hongze Cheng 已提交
66
  int32_t      code = sclCreateColumnInfoData(&pValueNode->node.resType, 1, &in);
H
Haojun Liao 已提交
67 68 69 70
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

71
  colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
72

73
  colInfoDataEnsureCapacity(out->columnData, 1);
D
dapan1121 已提交
74
  code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1);
75 76 77
  sclFreeParam(&in);

  return code;
78 79
}

D
dapan1121 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
  SSDataBlock* pb = taosArrayGetP(pBlockList, 0);
  SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
  if (NULL == pLeft) {
    sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pLeft->numOfRows = pb->info.rows;

  if (pDst->numOfRows < pb->info.rows) {
    colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);
  }
  
  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
  OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);

  taosMemoryFree(pLeft);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
102 103 104 105 106 107 108
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

G
Ganlin Zhao 已提交
109
  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type));
D
dapan1121 已提交
110

H
Hongze Cheng 已提交
111
  int32_t        code = 0;
D
dapan1121 已提交
112
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
H
Hongze Cheng 已提交
113 114
  SListCell     *cell = nodeList->pNodeList->pHead;
  SScalarParam   out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
115

D
dapan1121 已提交
116
  int32_t len = 0;
H
Hongze Cheng 已提交
117
  void   *buf = NULL;
G
Ganlin Zhao 已提交
118

D
dapan1121 已提交
119 120
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;
G
Ganlin Zhao 已提交
121

D
dapan1121 已提交
122
    if (valueNode->node.resType.type != type) {
123
      out.columnData->info.type = type;
D
dapan1121 已提交
124 125 126 127 128 129 130 131 132
      if (IS_VAR_DATA_TYPE(type)) {
        if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
          out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE;
        } else {
          out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE;
        }
      } else {
        out.columnData->info.bytes = tDataTypes[type].bytes;
      }
133

D
dapan1121 已提交
134
      int32_t overflow = 0;
D
dapan1121 已提交
135
      code = sclConvertValueToSclParam(valueNode, &out, &overflow);
136
      if (code != TSDB_CODE_SUCCESS) {
H
Hongze Cheng 已提交
137
        //        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
138 139 140
        SCL_ERR_JRET(code);
      }

D
dapan1121 已提交
141 142 143 144 145
      if (overflow) {
        cell = cell->pNext;
        continue;
      }

D
dapan1121 已提交
146
      if (IS_VAR_DATA_TYPE(type)) {
147
        buf = colDataGetVarData(out.columnData, 0);
D
dapan1121 已提交
148
        len = varDataTLen(buf);
D
dapan1121 已提交
149 150
      } else {
        len = tDataTypes[type].bytes;
151
        buf = out.columnData->pData;
D
dapan1121 已提交
152 153 154
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
155
      if (IS_VAR_DATA_TYPE(type)) {
156
        len = varDataTLen(buf);
D
dapan1121 已提交
157 158
      } else {
        len = valueNode->node.resType.bytes;
159
      }
D
dapan1121 已提交
160
    }
G
Ganlin Zhao 已提交
161

162
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
163
      sclError("taosHashPut to set failed");
D
dapan1121 已提交
164 165
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
166

D
dapan1121 已提交
167 168 169 170
    colDataDestroy(out.columnData);
    taosMemoryFreeClear(out.columnData);
    out.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));

D
dapan1121 已提交
171
    cell = cell->pNext;
D
dapan1121 已提交
172 173 174
  }

  *data = pObj;
D
dapan1121 已提交
175 176 177

  colDataDestroy(out.columnData);
  taosMemoryFreeClear(out.columnData);
D
dapan1121 已提交
178 179 180
  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
181 182 183

  colDataDestroy(out.columnData);
  taosMemoryFreeClear(out.columnData);
D
dapan1121 已提交
184 185 186 187
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
188 189
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
H
Hongze Cheng 已提交
190
  void         *pIter = taosHashIterate(res, NULL);
D
dapan1121 已提交
191 192 193 194
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
195
      sclFreeParam(p);
D
dapan1121 已提交
196 197 198 199 200 201
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

D
dapan1121 已提交
202
void sclFreeParam(SScalarParam *param) {
D
dapan1121 已提交
203
  if (NULL == param || !param->colAlloced) {
D
dapan1121 已提交
204 205
    return;
  }
H
Hongze Cheng 已提交
206

D
dapan1121 已提交
207
  if (param->columnData != NULL) {
208
    colDataDestroy(param->columnData);
209
    taosMemoryFreeClear(param->columnData);
210 211 212 213
  }

  if (param->pHashFilter != NULL) {
    taosHashCleanup(param->pHashFilter);
D
dapan1121 已提交
214
    param->pHashFilter = NULL;
215
  }
D
dapan1121 已提交
216 217
}

D
dapan1121 已提交
218 219 220 221
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
G
Ganlin Zhao 已提交
222

wafwerar's avatar
wafwerar 已提交
223
  *res = taosMemoryMalloc(pNode->node.resType.bytes);
D
dapan1121 已提交
224 225 226 227 228 229 230 231 232
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
233 234 235 236 237 238
void sclFreeParamList(SScalarParam *param, int32_t paramNum) {
  if (NULL == param) {
    return;
  }

  for (int32_t i = 0; i < paramNum; ++i) {
H
Hongze Cheng 已提交
239
    SScalarParam *p = param + i;
D
dapan1121 已提交
240 241 242 243 244 245
    sclFreeParam(p);
  }

  taosMemoryFree(param);
}

246 247 248 249 250 251
void sclDowngradeValueType(SValueNode *valueNode) {
  switch (valueNode->node.resType.type) {
    case TSDB_DATA_TYPE_BIGINT: {
      int8_t i8 = valueNode->datum.i;
      if (i8 == valueNode->datum.i) {
        valueNode->node.resType.type = TSDB_DATA_TYPE_TINYINT;
H
Hongze Cheng 已提交
252
        *(int8_t *)&valueNode->typeData = i8;
253 254 255 256 257
        break;
      }
      int16_t i16 = valueNode->datum.i;
      if (i16 == valueNode->datum.i) {
        valueNode->node.resType.type = TSDB_DATA_TYPE_SMALLINT;
H
Hongze Cheng 已提交
258
        *(int16_t *)&valueNode->typeData = i16;
259 260 261 262 263
        break;
      }
      int32_t i32 = valueNode->datum.i;
      if (i32 == valueNode->datum.i) {
        valueNode->node.resType.type = TSDB_DATA_TYPE_INT;
H
Hongze Cheng 已提交
264
        *(int32_t *)&valueNode->typeData = i32;
265 266 267 268
        break;
      }
      break;
    }
H
Hongze Cheng 已提交
269
    case TSDB_DATA_TYPE_UBIGINT: {
270 271 272 273 274
      uint8_t u8 = valueNode->datum.i;
      if (u8 == valueNode->datum.i) {
        int8_t i8 = valueNode->datum.i;
        if (i8 == valueNode->datum.i) {
          valueNode->node.resType.type = TSDB_DATA_TYPE_TINYINT;
H
Hongze Cheng 已提交
275
          *(int8_t *)&valueNode->typeData = i8;
276 277
        } else {
          valueNode->node.resType.type = TSDB_DATA_TYPE_UTINYINT;
H
Hongze Cheng 已提交
278
          *(uint8_t *)&valueNode->typeData = u8;
279 280 281 282 283 284 285 286
        }
        break;
      }
      uint16_t u16 = valueNode->datum.i;
      if (u16 == valueNode->datum.i) {
        int16_t i16 = valueNode->datum.i;
        if (i16 == valueNode->datum.i) {
          valueNode->node.resType.type = TSDB_DATA_TYPE_SMALLINT;
H
Hongze Cheng 已提交
287
          *(int16_t *)&valueNode->typeData = i16;
288 289
        } else {
          valueNode->node.resType.type = TSDB_DATA_TYPE_USMALLINT;
H
Hongze Cheng 已提交
290
          *(uint16_t *)&valueNode->typeData = u16;
291 292 293 294 295 296 297 298
        }
        break;
      }
      uint32_t u32 = valueNode->datum.i;
      if (u32 == valueNode->datum.i) {
        int32_t i32 = valueNode->datum.i;
        if (i32 == valueNode->datum.i) {
          valueNode->node.resType.type = TSDB_DATA_TYPE_INT;
H
Hongze Cheng 已提交
299
          *(int32_t *)&valueNode->typeData = i32;
300 301
        } else {
          valueNode->node.resType.type = TSDB_DATA_TYPE_UINT;
H
Hongze Cheng 已提交
302
          *(uint32_t *)&valueNode->typeData = u32;
303 304 305 306 307 308 309 310 311
        }
        break;
      }
      break;
    }
    case TSDB_DATA_TYPE_DOUBLE: {
      float f = valueNode->datum.d;
      if (FLT_EQUAL(f, valueNode->datum.d)) {
        valueNode->node.resType.type = TSDB_DATA_TYPE_FLOAT;
H
Hongze Cheng 已提交
312
        *(float *)&valueNode->typeData = f;
313 314 315 316 317 318 319 320 321
        break;
      }
      break;
    }
    default:
      break;
  }
}

H
Hongze Cheng 已提交
322
int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
D
dapan1121 已提交
323
  switch (nodeType(node)) {
D
dapan1121 已提交
324
    case QUERY_NODE_LEFT_VALUE: {
H
Hongze Cheng 已提交
325
      SSDataBlock *pb = taosArrayGetP(ctx->pBlockList, 0);
D
dapan1121 已提交
326 327 328
      param->numOfRows = pb->info.rows;
      break;
    }
D
dapan1121 已提交
329 330
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
331

H
Haojun Liao 已提交
332
      ASSERT(param->columnData == NULL);
333
      param->numOfRows = 1;
H
Hongze Cheng 已提交
334
      /*int32_t code = */ sclCreateColumnInfoData(&valueNode->node.resType, 1, param);
wmmhello's avatar
wmmhello 已提交
335
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
336
        colDataAppendNULL(param->columnData, 0);
337 338
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
339
      }
D
dapan1121 已提交
340 341
      break;
    }
D
dapan1121 已提交
342 343
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
344 345
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
346 347 348
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan1121 已提交
349 350 351 352
      int32_t type = vectorGetConvertType(ctx->type.selfType, ctx->type.peerType);
      if (type == 0) {
        type = nodeList->dataType.type;
      }
G
Ganlin Zhao 已提交
353

D
dapan1121 已提交
354 355
      SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type));
      param->hashValueType = type;
D
dapan1121 已提交
356
      param->colAlloced = true;
D
dapan 已提交
357
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
358
        taosHashCleanup(param->pHashFilter);
D
dapan1121 已提交
359
        param->pHashFilter = NULL;
D
dapan 已提交
360 361
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
G
Ganlin Zhao 已提交
362
      }
D
dapan1121 已提交
363
      param->colAlloced = false;
D
dapan1121 已提交
364 365
      break;
    }
X
Xiaoyu Wang 已提交
366
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
367 368
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
369 370
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
G
Ganlin Zhao 已提交
371

X
Xiaoyu Wang 已提交
372
      SColumnNode *ref = (SColumnNode *)node;
373 374

      int32_t index = -1;
H
Hongze Cheng 已提交
375 376
      for (int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
        SSDataBlock *pb = taosArrayGetP(ctx->pBlockList, i);
377 378 379 380 381 382 383
        if (pb->info.blockId == ref->dataBlockId) {
          index = i;
          break;
        }
      }

      if (index == -1) {
H
Hongze Cheng 已提交
384 385
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId,
                 (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
386 387 388
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

389
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
G
Ganlin Zhao 已提交
390 391 392 393 394
      if (NULL == block) {
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

      if (ref->slotId >= taosArrayGetSize(block->pDataBlock)) {
H
Hongze Cheng 已提交
395 396
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId,
                 (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
397 398 399
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
400
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
401
#if TAG_FILTER_DEBUG
H
Hongze Cheng 已提交
402 403
      qDebug("tagfilter column info, slotId:%d, colId:%d, type:%d", ref->slotId, columnData->info.colId,
             columnData->info.type);
404
#endif
405 406
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
407 408
      break;
    }
409 410
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
D
dapan1121 已提交
411 412
    case QUERY_NODE_LOGIC_CONDITION:
    case QUERY_NODE_CASE_WHEN: {
D
dapan1121 已提交
413 414 415 416 417 418
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
D
dapan1121 已提交
419
      param->colAlloced = false;
D
dapan1121 已提交
420 421
      break;
    }
422 423
    default:
      break;
D
dapan1121 已提交
424 425
  }

426 427 428
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
429 430
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
G
Ganlin Zhao 已提交
431

432
    *rowNum = param->numOfRows;
D
dapan1121 已提交
433 434
  }

435
  param->param = ctx->param;
D
dapan1121 已提交
436 437 438
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
439 440
int32_t sclInitParamList(SScalarParam **pParams, SNodeList *pParamList, SScalarCtx *ctx, int32_t *paramNum,
                         int32_t *rowNum) {
D
dapan1121 已提交
441
  int32_t code = 0;
D
dapan1121 已提交
442 443
  if (NULL == pParamList) {
    if (ctx->pBlockList) {
D
dapan1121 已提交
444
      SSDataBlock *pBlock = taosArrayGetP(ctx->pBlockList, 0);
D
dapan1121 已提交
445 446 447 448 449
      *rowNum = pBlock->info.rows;
    } else {
      *rowNum = 1;
    }

D
dapan 已提交
450
    *paramNum = 1;
D
dapan1121 已提交
451
  } else {
D
dapan 已提交
452
    *paramNum = pParamList->length;
D
dapan1121 已提交
453 454
  }

D
dapan 已提交
455
  SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
456
  if (NULL == paramList) {
D
dapan 已提交
457
    sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
D
dapan1121 已提交
458
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
459 460
  }

D
dapan1121 已提交
461
  if (pParamList) {
H
Hongze Cheng 已提交
462
    SNode  *tnode = NULL;
D
dapan1121 已提交
463 464
    int32_t i = 0;
    if (SCL_IS_CONST_CALC(ctx)) {
H
Hongze Cheng 已提交
465
      WHERE_EACH(tnode, pParamList) {
D
dapan1121 已提交
466
        if (!SCL_IS_CONST_NODE(tnode)) {
D
dapan 已提交
467
          WHERE_NEXT;
D
dapan1121 已提交
468 469 470 471
        } else {
          SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
          ERASE_NODE(pParamList);
        }
G
Ganlin Zhao 已提交
472

D
dapan1121 已提交
473 474 475
        ++i;
      }
    } else {
G
Ganlin Zhao 已提交
476
      FOREACH(tnode, pParamList) {
D
dapan1121 已提交
477 478 479
        SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
        ++i;
      }
D
dapan1121 已提交
480
    }
D
dapan1121 已提交
481 482 483
  } else {
    paramList[0].numOfRows = *rowNum;
  }
D
dapan1121 已提交
484

D
dapan1121 已提交
485
  if (0 == *rowNum) {
G
Ganlin Zhao 已提交
486
    taosMemoryFreeClear(paramList);
D
dapan1121 已提交
487
  }
D
dapan1121 已提交
488

D
dapan1121 已提交
489
  *pParams = paramList;
D
dapan1121 已提交
490
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
491

D
dapan1121 已提交
492
_return:
wafwerar's avatar
wafwerar 已提交
493
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
494 495 496
  SCL_RET(code);
}

D
dapan1121 已提交
497 498 499 500
int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) {
  if (NULL == pNode) {
    return -1;
  }
G
Ganlin Zhao 已提交
501

wafwerar's avatar
wafwerar 已提交
502
  switch ((int)nodeType(pNode)) {
D
dapan1121 已提交
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)pNode;
      return valueNode->node.resType.type;
    }
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)pNode;
      return nodeList->dataType.type;
    }
    case QUERY_NODE_COLUMN: {
      SColumnNode *colNode = (SColumnNode *)pNode;
      return colNode->node.resType.type;
    }
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &pNode, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode);
        return -1;
      }
      return res->columnData->info.type;
    }
  }

  return -1;
}

void sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) {
  ctx->type.opResType = node->node.resType.type;
  ctx->type.selfType = sclGetNodeType(node->pLeft, ctx);
  ctx->type.peerType = sclGetNodeType(node->pRight, ctx);
}

D
dapan1121 已提交
536 537
int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
538
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
539 540 541 542
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
G
Ganlin Zhao 已提交
543

wafwerar's avatar
wafwerar 已提交
544
  SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
545 546
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
547 548 549
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
550 551
  sclSetOperatorValueType(node, ctx);

D
dapan1121 已提交
552
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
553
  if (paramNum > 1) {
D
dapan1121 已提交
554
    TSWAP(ctx->type.selfType, ctx->type.peerType);
D
dapan1121 已提交
555
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
556 557
  }

D
dapan1121 已提交
558
  *pParams = paramList;
D
dapan1121 已提交
559
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
560 561

_return:
wafwerar's avatar
wafwerar 已提交
562
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
563
  SCL_RET(code);
D
dapan1121 已提交
564 565
}

D
dapan1121 已提交
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
int32_t sclGetNodeRes(SNode* node, SScalarCtx *ctx, SScalarParam **res) {
  if (NULL == node) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t rowNum = 0;
  *res = taosMemoryCalloc(1, sizeof(**res));
  if (NULL == *res) {
    SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
  
  SCL_ERR_RET(sclInitParam(node, *res, ctx, &rowNum));

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
582
int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pCase, SScalarParam *pElse, SScalarParam *pComp, SScalarParam *output, int32_t rowIdx, int32_t totalRows, bool *complete) {
D
dapan1121 已提交
583 584 585 586
  SNode *node = NULL;
  SWhenThenNode* pWhenThen = NULL;
  SScalarParam *pWhen = NULL;
  SScalarParam *pThen = NULL;
D
dapan1121 已提交
587
  int32_t code = 0;
D
dapan1121 已提交
588 589 590 591 592 593 594

  for (SListCell* cell = pCell; (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext) {
    pWhenThen = (SWhenThenNode*)node;
    
    SCL_ERR_RET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen));
    SCL_ERR_RET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen));
    
D
dapan1121 已提交
595
    vectorCompareImpl(pCase, pWhen, pComp, rowIdx, 1, TSDB_ORDER_ASC, OP_TYPE_EQUAL);
D
dapan1121 已提交
596
    
D
dapan1121 已提交
597
    bool *equal = (bool*)colDataGetData(pComp->columnData, rowIdx);
D
dapan1121 已提交
598
    if (*equal) {
D
dapan1121 已提交
599 600 601 602 603 604
      colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));

      if (0 == rowIdx && 1 == pCase->numOfRows && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && totalRows > 1) {
        SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
        *complete = true;
      }
D
dapan1121 已提交
605
      
D
dapan1121 已提交
606
      goto _return;
D
dapan1121 已提交
607 608 609 610
    }
  }

  if (pElse) {
D
dapan1121 已提交
611 612 613 614 615 616 617 618
    colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));

    if (0 == rowIdx && 1 == pCase->numOfRows && 1 == pElse->numOfRows && totalRows > 1) {
      SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
      *complete = true;
    }
      
    goto _return;
D
dapan1121 已提交
619 620 621 622
  }

  colDataAppend(output->columnData, rowIdx, NULL, true);

D
dapan1121 已提交
623
  if (0 == rowIdx && 1 == pCase->numOfRows && totalRows > 1) {
D
dapan1121 已提交
624 625 626 627 628 629 630 631 632 633
    SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
    *complete = true;
  }

_return:
  
  sclFreeParam(pWhen);
  sclFreeParam(pThen);

  SCL_RET(code);
D
dapan1121 已提交
634 635
}

D
dapan1121 已提交
636 637
int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pElse, SScalarParam *output, 
                             int32_t rowIdx, int32_t totalRows, bool *complete, bool preSingle) {
D
dapan1121 已提交
638 639 640 641 642 643 644 645 646 647 648 649 650 651
  SNode *node = NULL;
  SWhenThenNode* pWhenThen = NULL;
  SScalarParam *pWhen = NULL;
  SScalarParam *pThen = NULL;
  int32_t code = 0;

  for (SListCell* cell = pCell; (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext) {
    pWhenThen = (SWhenThenNode*)node;
    pWhen = NULL;
    pThen = NULL;
    
    SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen));
    SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen));

D
dapan1121 已提交
652
    bool *whenValue = (bool*)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? rowIdx : 0));
D
dapan1121 已提交
653 654
    
    if (*whenValue) {
D
dapan1121 已提交
655
      colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));
D
dapan1121 已提交
656

D
dapan1121 已提交
657
      if (preSingle && 0 == rowIdx && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && totalRows > 1) {
D
dapan1121 已提交
658
        SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
D
dapan1121 已提交
659
        *complete = true;
D
dapan1121 已提交
660 661
      }
      
D
dapan1121 已提交
662 663 664 665 666 667 668 669
      goto _return;
    }

    sclFreeParam(pWhen);
    sclFreeParam(pThen);
  }

  if (pElse) {
D
dapan1121 已提交
670
    colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));
D
dapan1121 已提交
671

D
dapan1121 已提交
672
    if (preSingle && 0 == rowIdx && 1 == pElse->numOfRows && totalRows > 1) {
D
dapan1121 已提交
673
      SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
D
dapan1121 已提交
674
      *complete = true;
D
dapan1121 已提交
675 676
    }
      
D
dapan1121 已提交
677 678 679 680 681
    goto _return;
  }

  colDataAppend(output->columnData, rowIdx, NULL, true);

D
dapan1121 已提交
682
  if (preSingle && 0 == rowIdx && totalRows > 1) {
D
dapan1121 已提交
683
    SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
D
dapan1121 已提交
684
    *complete = true;
D
dapan1121 已提交
685 686
  }

D
dapan1121 已提交
687 688 689 690
_return:

  sclFreeParam(pWhen);
  sclFreeParam(pThen);
D
dapan1121 已提交
691

D
dapan1121 已提交
692
  SCL_RET(code);
D
dapan1121 已提交
693 694
}

695
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
696
  SScalarParam *params = NULL;
H
Hongze Cheng 已提交
697 698 699
  int32_t       rowNum = 0;
  int32_t       paramNum = 0;
  int32_t       code = 0;
D
dapan 已提交
700
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
701

D
dapan1121 已提交
702
  if (fmIsUserDefinedFunc(node->funcId)) {
703
    code = callUdfScalarFunc(node->functionName, params, paramNum, output);
704 705 706 707
    if (code != 0) {
      sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
      goto _return;
    }
D
dapan1121 已提交
708 709 710 711 712 713 714
  } else {
    SScalarFuncExecFuncs ffpSet = {0};
    code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
    if (code) {
      sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
G
Ganlin Zhao 已提交
715

H
Haojun Liao 已提交
716 717 718
    code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
    if (code != TSDB_CODE_SUCCESS) {
      SCL_ERR_JRET(code);
D
dapan1121 已提交
719 720 721 722 723 724 725
    }

    code = (*ffpSet.process)(params, paramNum, output);
    if (code) {
      sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
726 727 728 729
  }

_return:

D
dapan1121 已提交
730
  sclFreeParamList(params, paramNum);
D
dapan1121 已提交
731 732 733 734 735
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
H
Hongze Cheng 已提交
736 737
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList,
             node->pParameterList ? node->pParameterList->length : 0);
D
dapan1121 已提交
738 739 740 741 742 743
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
744 745
  }

D
dapan1121 已提交
746 747 748 749 750 751
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
H
Hongze Cheng 已提交
752 753 754
  int32_t       rowNum = 0;
  int32_t       paramNum = 0;
  int32_t       code = 0;
D
dapan 已提交
755
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
756 757 758 759
  if (NULL == params) {
    output->numOfRows = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
760

761 762 763 764
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
H
Haojun Liao 已提交
765 766 767
  code = sclCreateColumnInfoData(&t, rowNum, output);
  if (code != TSDB_CODE_SUCCESS) {
    SCL_ERR_JRET(code);
D
dapan1121 已提交
768
  }
D
dapan1121 已提交
769

770 771
  int32_t numOfQualified = 0;

D
dapan1121 已提交
772
  bool value = false;
D
dapan 已提交
773
  bool complete = true;
D
dapan1121 已提交
774
  for (int32_t i = 0; i < rowNum; ++i) {
D
dapan 已提交
775 776
    complete = true;
    for (int32_t m = 0; m < paramNum; ++m) {
D
dapan1121 已提交
777
      if (NULL == params[m].columnData) {
D
dapan 已提交
778
        complete = false;
D
dapan1121 已提交
779 780
        continue;
      }
H
Hongze Cheng 已提交
781
      char *p = colDataGetData(params[m].columnData, i);
782 783
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
784
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
D
dapan1121 已提交
785
        complete = true;
D
dapan1121 已提交
786 787
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
D
dapan1121 已提交
788
        complete = true;
D
dapan1121 已提交
789 790 791 792 793 794
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

D
dapan 已提交
795
    if (complete) {
H
Hongze Cheng 已提交
796
      colDataAppend(output->columnData, i, (char *)&value, false);
797 798 799
      if (value) {
        numOfQualified++;
      }
D
dapan 已提交
800
    }
D
dapan1121 已提交
801 802
  }

D
dapan1121 已提交
803 804 805 806 807
  if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
    sclFreeParam(output);
    output->numOfRows = 0;
  }

808
  output->numOfQualified = numOfQualified;
D
dapan1121 已提交
809

810
_return:
D
dapan1121 已提交
811
  sclFreeParamList(params, paramNum);
D
dapan1121 已提交
812
  SCL_RET(code);
D
dapan1121 已提交
813 814 815 816
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
H
Hongze Cheng 已提交
817 818
  int32_t       rowNum = 0;
  int32_t       code = 0;
819

wmmhello's avatar
wmmhello 已提交
820
  // json not support in in operator
H
Haojun Liao 已提交
821
  if (nodeType(node->pLeft) == QUERY_NODE_VALUE) {
wmmhello's avatar
wmmhello 已提交
822
    SValueNode *valueNode = (SValueNode *)node->pLeft;
H
Hongze Cheng 已提交
823 824
    if (valueNode->node.resType.type == TSDB_DATA_TYPE_JSON &&
        (node->opType == OP_TYPE_IN || node->opType == OP_TYPE_NOT_IN)) {
wmmhello's avatar
wmmhello 已提交
825 826 827 828
      SCL_RET(TSDB_CODE_QRY_JSON_IN_ERROR);
    }
  }

D
dapan1121 已提交
829
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
830
  if (output->columnData == NULL) {
H
Haojun Liao 已提交
831 832 833 834
    code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
    if (code != TSDB_CODE_SUCCESS) {
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
835 836 837 838
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

H
Hongze Cheng 已提交
839 840 841
  int32_t       paramNum = scalarGetOperatorParamNum(node->opType);
  SScalarParam *pLeft = &params[0];
  SScalarParam *pRight = paramNum > 1 ? &params[1] : NULL;
842

wmmhello's avatar
wmmhello 已提交
843
  terrno = TSDB_CODE_SUCCESS;
D
dapan 已提交
844
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
wmmhello's avatar
wmmhello 已提交
845
  code = terrno;
D
dapan1121 已提交
846 847

_return:
D
dapan1121 已提交
848

D
dapan1121 已提交
849
  sclFreeParamList(params, paramNum);
D
dapan1121 已提交
850
  SCL_RET(code);
D
dapan1121 已提交
851 852
}

D
dapan1121 已提交
853 854 855 856
int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *output) {
  int32_t code = 0;
  SScalarParam *pCase = NULL;
  SScalarParam *pElse = NULL;
D
dapan1121 已提交
857 858
  SScalarParam *pWhen = NULL;
  SScalarParam *pThen = NULL;
D
dapan1121 已提交
859
  SScalarParam  comp = {0};
D
dapan1121 已提交
860
  int32_t rowNum = 1;
D
dapan1121 已提交
861
  bool complete = false;
D
dapan1121 已提交
862 863 864 865 866 867 868 869 870

  if (NULL == node->pWhenThenList || node->pWhenThenList->length <= 0) {
    sclError("invalid whenThen list");
    SCL_ERR_RET(TSDB_CODE_INVALID_PARA);
  }

  if (ctx->pBlockList) {
    SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, 0);
    rowNum = pb->info.rows;
D
dapan1121 已提交
871
    output->numOfRows = pb->info.rows;
D
dapan1121 已提交
872
  }
D
dapan1121 已提交
873 874 875 876 877 878 879 880 881

  SCL_ERR_JRET(sclCreateColumnInfoData(&node->node.resType, rowNum, output));
  
  SCL_ERR_JRET(sclGetNodeRes(node->pCase, ctx, &pCase));
  SCL_ERR_JRET(sclGetNodeRes(node->pElse, ctx, &pElse));

  SDataType compType = {0};
  compType.type = TSDB_DATA_TYPE_BOOL;
  compType.bytes = tDataTypes[compType.type].bytes;
D
dapan1121 已提交
882
  
D
dapan1121 已提交
883
  SCL_ERR_JRET(sclCreateColumnInfoData(&compType, rowNum, &comp));
D
dapan1121 已提交
884

D
dapan1121 已提交
885
  SNode* tnode = NULL;
D
dapan1121 已提交
886 887 888 889 890 891
  SWhenThenNode* pWhenThen = (SWhenThenNode*)node->pWhenThenList->pHead->pNode;

  SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen));
  SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen));

  if (pCase) {
D
dapan1121 已提交
892
    vectorCompare(pCase, pWhen, &comp, TSDB_ORDER_ASC, OP_TYPE_EQUAL);
D
dapan1121 已提交
893 894
    
    for (int32_t i = 0; i < rowNum; ++i) {
D
dapan1121 已提交
895
      bool *equal = (bool*)colDataGetData(comp.columnData, (comp.numOfRows > 1 ? i : 0));
D
dapan1121 已提交
896
      if (*equal) {
D
dapan1121 已提交
897
        colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
D
dapan1121 已提交
898
        if (0 == i && 1 == pCase->numOfRows && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
D
dapan1121 已提交
899 900 901
          SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
          break;
        }
D
dapan1121 已提交
902
      } else {
D
dapan1121 已提交
903 904 905 906
        SCL_ERR_JRET(sclWalkCaseWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pCase, pElse, &comp, output, i, rowNum, &complete));
        if (complete) {
          break;
        }
D
dapan1121 已提交
907
      }
D
dapan1121 已提交
908
    }
D
dapan1121 已提交
909 910
  } else {
    for (int32_t i = 0; i < rowNum; ++i) {
D
dapan1121 已提交
911 912 913 914 915 916 917
      bool *whenValue = (bool*)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? i : 0));
      if (*whenValue) {
        colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
        if (0 == i && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
          SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
          break;
        }
D
dapan1121 已提交
918
      } else {
D
dapan1121 已提交
919
        SCL_ERR_JRET(sclWalkWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pElse, output, i, rowNum, &complete, (pWhen->numOfRows == 1 && pThen->numOfRows == 1)));
D
dapan1121 已提交
920 921 922
        if (complete) {
          break;
        }
D
dapan1121 已提交
923
      }
D
dapan1121 已提交
924 925 926
    }
  }

D
dapan1121 已提交
927 928
  sclFreeParam(pCase);
  sclFreeParam(pElse);
D
dapan1121 已提交
929
  sclFreeParam(&comp);
D
dapan1121 已提交
930 931
  sclFreeParam(pWhen);
  sclFreeParam(pThen);
D
dapan1121 已提交
932

D
dapan1121 已提交
933
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
934 935 936

_return:

D
dapan1121 已提交
937 938
  sclFreeParam(pCase);
  sclFreeParam(pElse);
D
dapan1121 已提交
939
  sclFreeParam(&comp);
D
dapan1121 已提交
940 941 942 943
  sclFreeParam(pWhen);
  sclFreeParam(pThen);
  sclFreeParam(output);

D
dapan1121 已提交
944 945 946 947
  SCL_RET(code);
}


H
Hongze Cheng 已提交
948
EDealRes sclRewriteNullInOptr(SNode **pNode, SScalarCtx *ctx, EOperatorType opType) {
D
dapan1121 已提交
949 950 951
  if (opType <= OP_TYPE_CALC_MAX) {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
G
Ganlin Zhao 已提交
952
      sclError("make value node failed");
D
dapan1121 已提交
953 954 955
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
G
Ganlin Zhao 已提交
956

D
dapan1121 已提交
957
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
G
Ganlin Zhao 已提交
958

D
dapan1121 已提交
959
    nodesDestroyNode(*pNode);
H
Hongze Cheng 已提交
960
    *pNode = (SNode *)res;
D
dapan1121 已提交
961 962 963
  } else {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
G
Ganlin Zhao 已提交
964
      sclError("make value node failed");
D
dapan1121 已提交
965 966 967
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
G
Ganlin Zhao 已提交
968

D
dapan1121 已提交
969
    res->node.resType.type = TSDB_DATA_TYPE_BOOL;
D
dapan1121 已提交
970
    res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
D
dapan1121 已提交
971
    res->datum.b = false;
G
Ganlin Zhao 已提交
972

D
dapan1121 已提交
973
    nodesDestroyNode(*pNode);
H
Hongze Cheng 已提交
974
    *pNode = (SNode *)res;
D
dapan1121 已提交
975 976 977 978 979
  }

  return DEAL_RES_CONTINUE;
}

H
Hongze Cheng 已提交
980
EDealRes sclAggFuncWalker(SNode *pNode, void *pContext) {
D
dapan1121 已提交
981
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
H
Hongze Cheng 已提交
982 983 984
    SFunctionNode *pFunc = (SFunctionNode *)pNode;
    *(bool *)pContext = fmIsAggFunc(pFunc->funcId);
    if (*(bool *)pContext) {
D
dapan1121 已提交
985 986 987 988 989 990 991
      return DEAL_RES_END;
    }
  }

  return DEAL_RES_CONTINUE;
}

H
Hongze Cheng 已提交
992
bool sclContainsAggFuncNode(SNode *pNode) {
D
dapan1121 已提交
993 994 995 996
  bool aggFunc = false;
  nodesWalkExpr(pNode, sclAggFuncWalker, (void *)&aggFunc);
  return aggFunc;
}
D
dapan1121 已提交
997

H
Hongze Cheng 已提交
998
EDealRes sclRewriteNonConstOperator(SNode **pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
999
  SOperatorNode *node = (SOperatorNode *)*pNode;
H
Hongze Cheng 已提交
1000
  int32_t        code = 0;
D
dapan1121 已提交
1001 1002 1003

  if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
    SValueNode *valueNode = (SValueNode *)node->pLeft;
H
Hongze Cheng 已提交
1004 1005
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL) &&
        (!sclContainsAggFuncNode(node->pRight))) {
D
dapan1121 已提交
1006
      return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
1007
    }
D
dapan 已提交
1008

H
Hongze Cheng 已提交
1009 1010 1011
    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) &&
        ((SExprNode *)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      code = sclConvertToTsValueNode(((SExprNode *)node->pRight)->resType.precision, valueNode);
D
dapan1121 已提交
1012 1013 1014 1015
      if (code) {
        ctx->code = code;
        return DEAL_RES_ERROR;
      }
D
dapan 已提交
1016
    }
1017 1018 1019

    if (SCL_IS_COMPARISON_OPERATOR(node->opType) && SCL_DOWNGRADE_DATETYPE(valueNode->node.resType.type)) {
      sclDowngradeValueType(valueNode);
H
Hongze Cheng 已提交
1020
    }
D
dapan1121 已提交
1021 1022 1023 1024
  }

  if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
    SValueNode *valueNode = (SValueNode *)node->pRight;
H
Hongze Cheng 已提交
1025 1026
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL) &&
        (!sclContainsAggFuncNode(node->pLeft))) {
D
dapan1121 已提交
1027
      return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
1028
    }
D
dapan 已提交
1029

H
Hongze Cheng 已提交
1030 1031 1032
    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) &&
        ((SExprNode *)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      code = sclConvertToTsValueNode(((SExprNode *)node->pLeft)->resType.precision, valueNode);
D
dapan1121 已提交
1033 1034 1035 1036
      if (code) {
        ctx->code = code;
        return DEAL_RES_ERROR;
      }
D
dapan 已提交
1037
    }
1038 1039 1040

    if (SCL_IS_COMPARISON_OPERATOR(node->opType) && SCL_DOWNGRADE_DATETYPE(valueNode->node.resType.type)) {
      sclDowngradeValueType(valueNode);
H
Hongze Cheng 已提交
1041
    }
D
dapan1121 已提交
1042 1043 1044 1045
  }

  if (node->pRight && (QUERY_NODE_NODE_LIST == nodeType(node->pRight))) {
    SNodeListNode *listNode = (SNodeListNode *)node->pRight;
H
Hongze Cheng 已提交
1046
    SNode         *tnode = NULL;
D
dapan1121 已提交
1047 1048 1049 1050 1051
    WHERE_EACH(tnode, listNode->pNodeList) {
      if (SCL_IS_NULL_VALUE_NODE(tnode)) {
        if (node->opType == OP_TYPE_IN) {
          ERASE_NODE(listNode->pNodeList);
          continue;
H
Hongze Cheng 已提交
1052
        } else {  // OP_TYPE_NOT_IN
D
dapan1121 已提交
1053
          return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
1054 1055 1056 1057 1058 1059 1060
        }
      }

      WHERE_NEXT;
    }

    if (listNode->pNodeList->length <= 0) {
D
dapan1121 已提交
1061
      return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
1062 1063 1064 1065 1066 1067
    }
  }

  return DEAL_RES_CONTINUE;
}

H
Hongze Cheng 已提交
1068
EDealRes sclRewriteFunction(SNode **pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
1069
  SFunctionNode *node = (SFunctionNode *)*pNode;
H
Hongze Cheng 已提交
1070
  SNode         *tnode = NULL;
1071
  if (!fmIsScalarFunc(node->funcId) && (!ctx->dual)) {
G
Ganlin Zhao 已提交
1072 1073
    return DEAL_RES_CONTINUE;
  }
1074

D
dapan1121 已提交
1075 1076 1077 1078 1079 1080
  FOREACH(tnode, node->pParameterList) {
    if (!SCL_IS_CONST_NODE(tnode)) {
      return DEAL_RES_CONTINUE;
    }
  }

D
dapan1121 已提交
1081
  SScalarParam output = {0};
1082

1083
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan 已提交
1084
  if (ctx->code) {
D
dapan1121 已提交
1085 1086 1087
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
1088
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
1089 1090
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
1091
    sclFreeParam(&output);
D
dapan 已提交
1092
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
1093 1094 1095
    return DEAL_RES_ERROR;
  }

1096 1097
  res->translate = true;

1098 1099 1100 1101
  res->node.resType.type = output.columnData->info.type;
  res->node.resType.bytes = output.columnData->info.bytes;
  res->node.resType.scale = output.columnData->info.scale;
  res->node.resType.precision = output.columnData->info.precision;
1102
  if (colDataIsNull_s(output.columnData, 0)) {
1103
    res->isNull = true;
D
dapan1121 已提交
1104
  } else {
1105
    int32_t type = output.columnData->info.type;
H
Hongze Cheng 已提交
1106
    if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
1107 1108 1109 1110
      int32_t len = getJsonValueLen(output.columnData->pData);
      res->datum.p = taosMemoryCalloc(len, 1);
      memcpy(res->datum.p, output.columnData->pData, len);
    } else if (IS_VAR_DATA_TYPE(type)) {
H
Hongze Cheng 已提交
1111
      // res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
1112
      res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, 1);
1113
      res->node.resType.bytes = varDataTLen(output.columnData->pData);
1114
      memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
1115
    } else {
D
dapan1121 已提交
1116
      nodesSetValueNodeValue(res, output.columnData->pData);
1117
    }
D
dapan1121 已提交
1118
  }
1119

D
dapan1121 已提交
1120
  nodesDestroyNode(*pNode);
H
Hongze Cheng 已提交
1121
  *pNode = (SNode *)res;
D
dapan1121 已提交
1122

D
dapan1121 已提交
1123
  sclFreeParam(&output);
D
dapan1121 已提交
1124 1125 1126
  return DEAL_RES_CONTINUE;
}

H
Hongze Cheng 已提交
1127
EDealRes sclRewriteLogic(SNode **pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
1128 1129
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

H
Haojun Liao 已提交
1130
  SScalarParam output = {0};
D
dapan 已提交
1131 1132
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
1133 1134 1135
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
1136 1137 1138 1139
  if (0 == output.numOfRows) {
    return DEAL_RES_CONTINUE;
  }

D
dapan1121 已提交
1140
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
1141 1142
  if (NULL == res) {
    sclError("make value node failed");
1143
    sclFreeParam(&output);
D
dapan 已提交
1144
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
1145 1146 1147 1148
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;
1149
  res->translate = true;
D
dapan1121 已提交
1150

1151 1152 1153 1154
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
1155
  } else {
D
dapan1121 已提交
1156
    nodesSetValueNodeValue(res, output.columnData->pData);
D
dapan1121 已提交
1157
  }
D
dapan1121 已提交
1158 1159

  nodesDestroyNode(*pNode);
H
Hongze Cheng 已提交
1160
  *pNode = (SNode *)res;
D
dapan1121 已提交
1161

D
dapan1121 已提交
1162
  sclFreeParam(&output);
D
dapan1121 已提交
1163 1164 1165
  return DEAL_RES_CONTINUE;
}

H
Hongze Cheng 已提交
1166
EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
1167
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
1168

D
dapan1121 已提交
1169
  if ((!SCL_IS_CONST_NODE(node->pLeft)) || (!SCL_IS_CONST_NODE(node->pRight))) {
D
dapan 已提交
1170
    return sclRewriteNonConstOperator(pNode, ctx);
D
dapan1121 已提交
1171 1172
  }

H
Haojun Liao 已提交
1173
  SScalarParam output = {0};
D
dapan 已提交
1174 1175
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
1176 1177 1178
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
1179
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
1180
  if (NULL == res) {
1181 1182
    sclError("make value node failed");
    sclFreeParam(&output);
D
dapan 已提交
1183
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
1184 1185 1186
    return DEAL_RES_ERROR;
  }

1187
  res->translate = true;
D
dapan1121 已提交
1188

1189
  res->node.resType = node->node.resType;
1190
  if (colDataIsNull_s(output.columnData, 0)) {
1191 1192
    res->isNull = true;
    res->node.resType = node->node.resType;
1193
  } else {
1194 1195 1196 1197 1198
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
      res->datum.p = output.columnData->pData;
      output.columnData->pData = NULL;
    } else {
G
Ganlin Zhao 已提交
1199
      nodesSetValueNodeValue(res, output.columnData->pData);
1200
    }
D
dapan1121 已提交
1201
  }
D
dapan1121 已提交
1202 1203

  nodesDestroyNode(*pNode);
H
Hongze Cheng 已提交
1204
  *pNode = (SNode *)res;
D
dapan1121 已提交
1205

H
Haojun Liao 已提交
1206
  sclFreeParam(&output);
D
dapan1121 已提交
1207 1208 1209
  return DEAL_RES_CONTINUE;
}

D
dapan1121 已提交
1210 1211 1212 1213 1214 1215 1216 1217 1218
EDealRes sclRewriteCaseWhen(SNode** pNode, SScalarCtx *ctx) {
  SCaseWhenNode *node = (SCaseWhenNode *)*pNode;

  if ((!SCL_IS_CONST_NODE(node->pCase)) || (!SCL_IS_CONST_NODE(node->pElse))) {
    return DEAL_RES_CONTINUE;
  }

  SNode* tnode = NULL;
  FOREACH(tnode, node->pWhenThenList) {
D
dapan1121 已提交
1219 1220
    SWhenThenNode* pWhenThen = (SWhenThenNode*)tnode;
    if (!SCL_IS_CONST_NODE(pWhenThen->pWhen) || !SCL_IS_CONST_NODE(pWhenThen->pThen)) {
D
dapan1121 已提交
1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262
      return DEAL_RES_CONTINUE;
    }
  }  

  SScalarParam output = {0};
  ctx->code = sclExecCaseWhen(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
  }

  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
  if (NULL == res) {
    sclError("make value node failed");
    sclFreeParam(&output);
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
  }

  res->translate = true;

  res->node.resType = node->node.resType;
  if (colDataIsNull_s(output.columnData, 0)) {
    res->isNull = true;
    res->node.resType = node->node.resType;
  } else {
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
      res->datum.p = output.columnData->pData;
      output.columnData->pData = NULL;
    } else {
      nodesSetValueNodeValue(res, output.columnData->pData);
    }
  }

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

  sclFreeParam(&output);
  return DEAL_RES_CONTINUE;
}


H
Hongze Cheng 已提交
1263
EDealRes sclConstantsRewriter(SNode **pNode, void *pContext) {
D
dapan 已提交
1264 1265
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
1266 1267 1268 1269
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
    return sclRewriteOperator(pNode, ctx);
  }

D
dapan1121 已提交
1270
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
1271
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
1272 1273 1274
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
1275
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
1276 1277
  }

D
dapan1121 已提交
1278
  if (QUERY_NODE_CASE_WHEN == nodeType(*pNode)) {
D
dapan1121 已提交
1279
    return sclRewriteCaseWhen(pNode, ctx);
1280
  }
1281 1282

  return DEAL_RES_CONTINUE;
D
dapan1121 已提交
1283 1284
}

H
Hongze Cheng 已提交
1285
EDealRes sclWalkFunction(SNode *pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
1286
  SFunctionNode *node = (SFunctionNode *)pNode;
H
Hongze Cheng 已提交
1287
  SScalarParam   output = {0};
1288

1289
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan1121 已提交
1290 1291
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
1292 1293
  }

D
dapan1121 已提交
1294
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
1295 1296
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
1297 1298
  }

D
dapan1121 已提交
1299 1300 1301
  return DEAL_RES_CONTINUE;
}

H
Hongze Cheng 已提交
1302
EDealRes sclWalkLogic(SNode *pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
1303
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
H
Hongze Cheng 已提交
1304
  SScalarParam         output = {0};
1305

D
dapan1121 已提交
1306 1307 1308
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
1309 1310
  }

D
dapan1121 已提交
1311
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
1312
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
1313
    return DEAL_RES_ERROR;
D
dapan1121 已提交
1314 1315 1316 1317 1318
  }

  return DEAL_RES_CONTINUE;
}

H
Hongze Cheng 已提交
1319
EDealRes sclWalkOperator(SNode *pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
1320
  SOperatorNode *node = (SOperatorNode *)pNode;
H
Hongze Cheng 已提交
1321
  SScalarParam   output = {0};
G
Ganlin Zhao 已提交
1322

D
dapan1121 已提交
1323 1324
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
1325 1326
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
1327

D
dapan1121 已提交
1328
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
1329
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
1330 1331 1332
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
1333 1334 1335
  return DEAL_RES_CONTINUE;
}

H
Hongze Cheng 已提交
1336
EDealRes sclWalkTarget(SNode *pNode, SScalarCtx *ctx) {
1337 1338 1339
  STargetNode *target = (STargetNode *)pNode;

  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
H
Hongze Cheng 已提交
1340 1341
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId,
             (int32_t)taosArrayGetSize(ctx->pBlockList));
1342 1343 1344 1345 1346
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  int32_t index = -1;
H
Hongze Cheng 已提交
1347 1348
  for (int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
    SSDataBlock *pb = taosArrayGetP(ctx->pBlockList, i);
1349 1350 1351 1352 1353 1354 1355
    if (pb->info.blockId == target->dataBlockId) {
      index = i;
      break;
    }
  }

  if (index == -1) {
H
Hongze Cheng 已提交
1356 1357
    sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId,
             (int32_t)taosArrayGetSize(ctx->pBlockList));
1358 1359 1360 1361 1362 1363 1364
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);

  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
H
Hongze Cheng 已提交
1365 1366
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId,
             (int32_t)taosArrayGetSize(block->pDataBlock));
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  // if block->pDataBlock is not enough, there are problems if target->slotId bigger than the size of block->pDataBlock,
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);

  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }

1381
  colDataAssign(col, res->columnData, res->numOfRows, NULL);
1382 1383 1384 1385 1386 1387
  block->info.rows = res->numOfRows;

  sclFreeParam(res);
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  return DEAL_RES_CONTINUE;
}
D
dapan 已提交
1388

D
dapan1121 已提交
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406
EDealRes sclWalkCaseWhen(SNode* pNode, SScalarCtx *ctx) {
  SCaseWhenNode *node = (SCaseWhenNode *)pNode;
  SScalarParam output = {0};

  ctx->code = sclExecCaseWhen(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
  }

  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
  }

  return DEAL_RES_CONTINUE;
}


H
Hongze Cheng 已提交
1407
EDealRes sclCalcWalker(SNode *pNode, void *pContext) {
D
dapan1121 已提交
1408 1409 1410
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) 
   || QUERY_NODE_COLUMN == nodeType(pNode) || QUERY_NODE_LEFT_VALUE == nodeType(pNode)
   || QUERY_NODE_WHEN_THEN == nodeType(pNode)) {
D
dapan1121 已提交
1411
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
1412
  }
D
dapan 已提交
1413 1414

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
1415 1416 1417 1418
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
    return sclWalkOperator(pNode, ctx);
  }

D
dapan1121 已提交
1419
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
1420
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
1421
  }
D
dapan1121 已提交
1422

D
dapan1121 已提交
1423
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
1424
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
1425
  }
D
dapan1121 已提交
1426

1427 1428 1429
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
1430

D
dapan1121 已提交
1431 1432 1433 1434
  if (QUERY_NODE_CASE_WHEN == nodeType(pNode)) {
    return sclWalkCaseWhen(pNode, ctx);
  }

D
dapan 已提交
1435
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
1436 1437
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
1438 1439
}

D
dapan1121 已提交
1440
int32_t sclCalcConstants(SNode *pNode, bool dual, SNode **pRes) {
D
dapan1121 已提交
1441 1442 1443 1444
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

H
Hongze Cheng 已提交
1445
  int32_t    code = 0;
D
dapan 已提交
1446
  SScalarCtx ctx = {0};
D
dapan1121 已提交
1447
  ctx.dual = dual;
1448
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan 已提交
1449
  if (NULL == ctx.pRes) {
1450
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan 已提交
1451 1452
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
G
Ganlin Zhao 已提交
1453

X
Xiaoyu Wang 已提交
1454
  nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
D
dapan 已提交
1455
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
1456 1457
  *pRes = pNode;

D
dapan 已提交
1458
_return:
1459

D
dapan 已提交
1460 1461
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
1462
}
D
dapan1121 已提交
1463

H
Hongze Cheng 已提交
1464 1465
static int32_t sclGetMinusOperatorResType(SOperatorNode *pOp) {
  if (!IS_MATHABLE_TYPE(((SExprNode *)(pOp->pLeft))->resType.type)) {
1466 1467 1468 1469 1470 1471 1472
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }
  pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
  pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1473 1474 1475
static int32_t sclGetMathOperatorResType(SOperatorNode *pOp) {
  SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
  SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494
  if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) ||
      (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) ||
      (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_INTEGER_TYPE(rdt.type)) ||
      (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_INTEGER_TYPE(ldt.type)) ||
      (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_BOOL == rdt.type) ||
      (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type)) {
    pOp->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
    pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
  } else {
    pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
    pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
  }
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1495 1496
static int32_t sclGetCompOperatorResType(SOperatorNode *pOp) {
  SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
1497
  if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
H
Hongze Cheng 已提交
1498
    ((SExprNode *)(pOp->pRight))->resType = ldt;
1499
  } else if (nodesIsRegularOp(pOp)) {
H
Hongze Cheng 已提交
1500
    SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
1501 1502 1503
    if (!IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) ||
        (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
1504 1505 1506 1507 1508 1509 1510
    }
  }
  pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
  pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1511 1512 1513
static int32_t sclGetJsonOperatorResType(SOperatorNode *pOp) {
  SDataType ldt = ((SExprNode *)(pOp->pLeft))->resType;
  SDataType rdt = ((SExprNode *)(pOp->pRight))->resType;
1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525
  if (TSDB_DATA_TYPE_JSON != ldt.type || !IS_STR_DATA_TYPE(rdt.type)) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }
  if (pOp->opType == OP_TYPE_JSON_GET_VALUE) {
    pOp->node.resType.type = TSDB_DATA_TYPE_JSON;
  } else if (pOp->opType == OP_TYPE_JSON_CONTAINS) {
    pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
  }
  pOp->node.resType.bytes = tDataTypes[pOp->node.resType.type].bytes;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1526
static int32_t sclGetBitwiseOperatorResType(SOperatorNode *pOp) {
1527 1528 1529 1530 1531
  pOp->node.resType.type = TSDB_DATA_TYPE_BIGINT;
  pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1532
int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { return sclCalcConstants(pNode, false, pRes); }
D
dapan1121 已提交
1533

H
Hongze Cheng 已提交
1534
int32_t scalarCalculateConstantsFromDual(SNode *pNode, SNode **pRes) { return sclCalcConstants(pNode, true, pRes); }
D
dapan1121 已提交
1535 1536 1537 1538 1539 1540

int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
  if (NULL == pNode || NULL == pBlockList) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

H
Hongze Cheng 已提交
1541
  int32_t    code = 0;
D
dapan1121 已提交
1542 1543 1544 1545 1546 1547 1548 1549
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst ? pDst->param : NULL};

  // TODO: OPT performance
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
1550

D
dapan1121 已提交
1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566
  nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
  SCL_ERR_JRET(ctx.code);

  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }

    if (1 == res->numOfRows) {
      SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
    } else {
      colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
      colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
      pDst->numOfRows = res->numOfRows;
1567
      pDst->numOfQualified = res->numOfQualified;
D
dapan1121 已提交
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
    }

    sclFreeParam(res);
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
  }

_return:
  sclFreeRes(ctx.pRes);
  return code;
}

H
Hongze Cheng 已提交
1579 1580 1581
int32_t scalarGetOperatorResultType(SOperatorNode *pOp) {
  if (TSDB_DATA_TYPE_BLOB == ((SExprNode *)(pOp->pLeft))->resType.type ||
      (NULL != pOp->pRight && TSDB_DATA_TYPE_BLOB == ((SExprNode *)(pOp->pRight))->resType.type)) {
1582 1583 1584 1585 1586
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  switch (pOp->opType) {
    case OP_TYPE_ADD:
D
dapan1121 已提交
1587 1588 1589 1590
    case OP_TYPE_SUB:
    case OP_TYPE_MULTI:
    case OP_TYPE_DIV:
    case OP_TYPE_REM:
D
dapan1121 已提交
1591
      return sclGetMathOperatorResType(pOp);
D
dapan1121 已提交
1592
    case OP_TYPE_MINUS:
D
dapan1121 已提交
1593
      return sclGetMinusOperatorResType(pOp);
1594
    case OP_TYPE_ASSIGN:
H
Hongze Cheng 已提交
1595
      pOp->node.resType = ((SExprNode *)(pOp->pLeft))->resType;
1596 1597 1598
      break;
    case OP_TYPE_BIT_AND:
    case OP_TYPE_BIT_OR:
D
dapan1121 已提交
1599
      return sclGetBitwiseOperatorResType(pOp);
D
dapan1121 已提交
1600 1601 1602 1603 1604 1605
    case OP_TYPE_GREATER_THAN:
    case OP_TYPE_GREATER_EQUAL:
    case OP_TYPE_LOWER_THAN:
    case OP_TYPE_LOWER_EQUAL:
    case OP_TYPE_EQUAL:
    case OP_TYPE_NOT_EQUAL:
1606 1607 1608 1609 1610 1611 1612 1613
    case OP_TYPE_IS_NULL:
    case OP_TYPE_IS_NOT_NULL:
    case OP_TYPE_IS_TRUE:
    case OP_TYPE_IS_FALSE:
    case OP_TYPE_IS_UNKNOWN:
    case OP_TYPE_IS_NOT_TRUE:
    case OP_TYPE_IS_NOT_FALSE:
    case OP_TYPE_IS_NOT_UNKNOWN:
D
dapan1121 已提交
1614 1615 1616 1617
    case OP_TYPE_LIKE:
    case OP_TYPE_NOT_LIKE:
    case OP_TYPE_MATCH:
    case OP_TYPE_NMATCH:
1618 1619
    case OP_TYPE_IN:
    case OP_TYPE_NOT_IN:
D
dapan1121 已提交
1620
      return sclGetCompOperatorResType(pOp);
D
dapan1121 已提交
1621
    case OP_TYPE_JSON_GET_VALUE:
1622
    case OP_TYPE_JSON_CONTAINS:
D
dapan1121 已提交
1623
      return sclGetJsonOperatorResType(pOp);
D
dapan1121 已提交
1624
    default:
1625
      break;
D
dapan1121 已提交
1626 1627
  }

1628 1629
  return TSDB_CODE_SUCCESS;
}