scalar.c 29.4 KB
Newer Older
D
dapan1121 已提交
1 2
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
3 4
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
5
#include "sclInt.h"
H
Haojun Liao 已提交
6 7 8
#include "sclvector.h"
#include "tcommon.h"
#include "tdatablock.h"
9
#include "scalar.h"
S
slzhou 已提交
10
#include "tudf.h"
D
fix bug  
dapan1121 已提交
11
#include "ttime.h"
D
dapan1121 已提交
12

D
dapan1121 已提交
13
int32_t scalarGetOperatorParamNum(EOperatorType type) {
D
dapan1121 已提交
14
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type 
D
dapan1121 已提交
15 16
   || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type
   || OP_TYPE_MINUS == type) {
D
dapan1121 已提交
17 18 19 20 21 22
    return 1;
  }

  return 2;
}

D
dapan 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35
void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
  char *timeStr = valueNode->datum.p;
  if (convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i) !=
      TSDB_CODE_SUCCESS) {
    valueNode->datum.i = 0;
  }
  taosMemoryFree(timeStr);
  
  valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
  valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
}


36
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
H
Haojun Liao 已提交
37
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
38 39 40 41 42 43 44 45 46 47
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pColumnData->info.type      = pType->type;
  pColumnData->info.bytes     = pType->bytes;
  pColumnData->info.scale     = pType->scale;
  pColumnData->info.precision = pType->precision;

48
  int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows);
49 50
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
51
    taosMemoryFree(pColumnData);
52 53 54 55 56 57
    return NULL;
  } else {
    return pColumnData;
  }
}

58 59 60 61
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
  SScalarParam in = {.numOfRows = 1};
  in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
  colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
62

63
  colInfoDataEnsureCapacity(out->columnData, 0, 1);
64 65 66 67
  int32_t code = vectorConvertImpl(&in, out);
  sclFreeParam(&in);

  return code;
68 69
}

D
dapan1121 已提交
70 71 72 73 74 75 76 77 78 79 80 81
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); 

  int32_t code = 0;
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
  SListCell *cell = nodeList->pNodeList->pHead;
H
Haojun Liao 已提交
82
  SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
83

D
dapan1121 已提交
84 85 86 87 88
  int32_t len = 0;
  void *buf = NULL;
  
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;
D
dapan1121 已提交
89
    
D
dapan1121 已提交
90
    if (valueNode->node.resType.type != type) {
91
      out.columnData->info.type = type;
D
dapan1121 已提交
92 93 94 95 96 97 98 99 100
      if (IS_VAR_DATA_TYPE(type)) {
        if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
          out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE;
        } else {
          out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE;
        }
      } else {
        out.columnData->info.bytes = tDataTypes[type].bytes;
      }
101

102 103
      code = doConvertDataType(valueNode, &out);
      if (code != TSDB_CODE_SUCCESS) {
104
//        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
105 106 107 108
        SCL_ERR_JRET(code);
      }

      if (IS_VAR_DATA_TYPE(type)) {
D
dapan1121 已提交
109 110 111
        char* data = colDataGetVarData(out.columnData, 0);
        len = varDataLen(data);
        buf = varDataVal(data);
D
dapan1121 已提交
112 113
      } else {
        len = tDataTypes[type].bytes;
114
        buf = out.columnData->pData;
D
dapan1121 已提交
115 116 117
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
118 119 120 121 122
      if (IS_VAR_DATA_TYPE(type)) {
        len = varDataLen(buf);
        buf = varDataVal(buf);
      } else {
        len = valueNode->node.resType.bytes;
123
      }
D
dapan1121 已提交
124 125
    }
    
126
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
127
      sclError("taosHashPut to set failed");
D
dapan1121 已提交
128 129
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
130 131

    cell = cell->pNext;
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141
  }

  *data = pObj;
  return TSDB_CODE_SUCCESS;

_return:
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
142 143 144 145 146 147 148
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
  void *pIter = taosHashIterate(res, NULL);
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
149
      sclFreeParam(p);
D
dapan1121 已提交
150 151 152 153 154 155
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

S
slzhou 已提交
156 157 158 159 160 161 162 163 164 165 166 167
void sclFreeUdfHandles(SHashObj *udf2handle) {
  void *pIter = taosHashIterate(udf2handle, NULL);
  while (pIter) {
    UdfcFuncHandle *handle = (UdfcFuncHandle *)pIter;
    if (handle) {
      teardownUdf(*handle);
    }
    pIter = taosHashIterate(udf2handle, pIter);
  }
  taosHashCleanup(udf2handle);
}

D
dapan1121 已提交
168
void sclFreeParam(SScalarParam *param) {
169 170
  if (param->columnData != NULL) {
    colDataDestroy(param->columnData);
wmmhello's avatar
wmmhello 已提交
171
    taosMemoryFree(param->columnData);
172 173 174 175 176
  }

  if (param->pHashFilter != NULL) {
    taosHashCleanup(param->pHashFilter);
  }
D
dapan1121 已提交
177 178
}

D
dapan1121 已提交
179 180 181 182 183
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
  
wafwerar's avatar
wafwerar 已提交
184
  *res = taosMemoryMalloc(pNode->node.resType.bytes);
D
dapan1121 已提交
185 186 187 188 189 190 191 192 193
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
194 195 196 197
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
  switch (nodeType(node)) {
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
198 199 200 201

      param->numOfRows = 1;
      param->columnData = createColumnInfoData(&valueNode->node.resType, 1);
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type) {
202
        colDataAppendNULL(param->columnData, 0);
203 204
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
205
      }
D
dapan1121 已提交
206 207
      break;
    }
D
dapan1121 已提交
208 209
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
210 211
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
212 213 214
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan1121 已提交
215
      SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, nodeList->dataType.type));
D
dapan 已提交
216
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
217
        taosHashCleanup(param->pHashFilter);
D
dapan 已提交
218 219 220
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }   
D
dapan1121 已提交
221 222
      break;
    }
X
Xiaoyu Wang 已提交
223
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
224 225
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
226 227 228
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      
X
Xiaoyu Wang 已提交
229
      SColumnNode *ref = (SColumnNode *)node;
230 231 232 233 234 235 236 237 238 239 240

      int32_t index = -1;
      for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
        SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
        if (pb->info.blockId == ref->dataBlockId) {
          index = i;
          break;
        }
      }

      if (index == -1) {
D
dapan1121 已提交
241
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
242 243 244
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

245 246
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
      if (NULL == block || ref->slotId >= block->info.numOfCols) {
D
dapan 已提交
247
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
248 249 250
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
251
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
252 253
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
254 255
      break;
    }
256 257 258
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
D
dapan1121 已提交
259 260 261 262 263 264 265 266
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
      break;
    }
267 268
    default:
      break;
D
dapan1121 已提交
269 270
  }

271 272 273
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
274 275 276
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
    
277
    *rowNum = param->numOfRows;
D
dapan1121 已提交
278 279
  }

280
  param->param = ctx->param;
D
dapan1121 已提交
281 282 283
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
284
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) {  
D
dapan1121 已提交
285
  int32_t code = 0;
D
dapan1121 已提交
286 287 288 289 290 291 292 293
  if (NULL == pParamList) {
    if (ctx->pBlockList) {
      SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0);
      *rowNum = pBlock->info.rows;
    } else {
      *rowNum = 1;
    }

D
dapan 已提交
294
    *paramNum = 1;
D
dapan1121 已提交
295
  } else {
D
dapan 已提交
296
    *paramNum = pParamList->length;
D
dapan1121 已提交
297 298
  }

D
dapan 已提交
299
  SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
300
  if (NULL == paramList) {
D
dapan 已提交
301
    sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
D
dapan1121 已提交
302
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
303 304
  }

D
dapan1121 已提交
305 306 307 308 309 310
  if (pParamList) {
    SNode *tnode = NULL;
    int32_t i = 0;
    if (SCL_IS_CONST_CALC(ctx)) {
      WHERE_EACH (tnode, pParamList) { 
        if (!SCL_IS_CONST_NODE(tnode)) {
D
dapan 已提交
311
          WHERE_NEXT;
D
dapan1121 已提交
312 313 314 315 316 317 318 319 320 321 322 323
        } else {
          SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
          ERASE_NODE(pParamList);
        }
        
        ++i;
      }
    } else {
      FOREACH(tnode, pParamList) { 
        SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
        ++i;
      }
D
dapan1121 已提交
324
    }
D
dapan1121 已提交
325 326 327
  } else {
    paramList[0].numOfRows = *rowNum;
  }
D
dapan1121 已提交
328

D
dapan1121 已提交
329 330
  if (0 == *rowNum) {
    taosMemoryFreeClear(paramList);    
D
dapan1121 已提交
331
  }
D
dapan1121 已提交
332

D
dapan1121 已提交
333
  *pParams = paramList;
D
dapan1121 已提交
334
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
335

D
dapan1121 已提交
336
_return:
wafwerar's avatar
wafwerar 已提交
337
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
338 339 340 341 342
  SCL_RET(code);
}

int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
343
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
344 345 346 347 348
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
wafwerar's avatar
wafwerar 已提交
349
  SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
350 351
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
352 353 354
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
355
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
356
  if (paramNum > 1) {
D
dapan1121 已提交
357
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
358 359
  }

D
dapan1121 已提交
360
  *pParams = paramList;
D
dapan1121 已提交
361
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
362 363

_return:
wafwerar's avatar
wafwerar 已提交
364
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
365
  SCL_RET(code);
D
dapan1121 已提交
366 367
}

368
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
369 370
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
371
  int32_t paramNum = 0;
D
dapan1121 已提交
372
  int32_t code = 0;
D
dapan 已提交
373
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
374

D
dapan1121 已提交
375 376
  if (fmIsUserDefinedFunc(node->funcId)) {
    UdfcFuncHandle udfHandle = NULL;
S
slzhou 已提交
377 378 379 380 381 382
    char* udfName = node->functionName;
    if (ctx->udf2Handle) {
      UdfcFuncHandle *pHandle = taosHashGet(ctx->udf2Handle, udfName, strlen(udfName));
      if (pHandle) {
        udfHandle = *pHandle;
      }
383
    }
S
slzhou 已提交
384 385 386 387 388 389 390 391 392
    if (udfHandle == NULL) {
      code = setupUdf(udfName, &udfHandle);
      if (code != 0) {
        sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", udfName, code);
        goto _return;
      }
      if (ctx->udf2Handle) {
        taosHashPut(ctx->udf2Handle, udfName, strlen(udfName), &udfHandle, sizeof(UdfcFuncHandle));
      }
393
    }
S
slzhou 已提交
394
    code = callUdfScalarFunc(udfHandle, params, paramNum, output);
395 396 397 398
    if (code != 0) {
      sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
      goto _return;
    }
D
dapan1121 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
  } else {
    SScalarFuncExecFuncs ffpSet = {0};
    code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
    if (code) {
      sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
  
    output->columnData = createColumnInfoData(&node->node.resType, rowNum);
    if (output->columnData == NULL) {
      sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    code = (*ffpSet.process)(params, paramNum, output);
    if (code) {
      sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
418 419 420 421
  }

_return:

D
dapan 已提交
422
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
423
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
424 425
  }

wafwerar's avatar
wafwerar 已提交
426
  taosMemoryFreeClear(params);
D
dapan1121 已提交
427 428 429 430 431 432 433 434 435 436 437 438
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
439 440
  }

D
dapan1121 已提交
441 442 443 444 445 446 447
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
448
  int32_t paramNum = 0;
D
dapan1121 已提交
449
  int32_t code = 0;
D
dapan 已提交
450
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
451 452 453 454
  if (NULL == params) {
    output->numOfRows = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
455

456 457 458 459 460 461
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
  output->columnData = createColumnInfoData(&t, rowNum);
  if (output->columnData == NULL) {
D
dapan1121 已提交
462
    sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool)));
D
dapan1121 已提交
463 464
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
465

D
dapan1121 已提交
466
  bool value = false;
D
dapan 已提交
467
  bool complete = true;
D
dapan1121 已提交
468
  for (int32_t i = 0; i < rowNum; ++i) {
D
dapan 已提交
469 470
    complete = true;
    for (int32_t m = 0; m < paramNum; ++m) {
D
dapan1121 已提交
471
      if (NULL == params[m].columnData) {
D
dapan 已提交
472
        complete = false;
D
dapan1121 已提交
473 474
        continue;
      }
475 476 477
      char* p = colDataGetData(params[m].columnData, i);
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
478
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
D
dapan1121 已提交
479
        complete = true;
D
dapan1121 已提交
480 481
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
D
dapan1121 已提交
482
        complete = true;
D
dapan1121 已提交
483 484 485 486 487 488
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

D
dapan 已提交
489 490 491
    if (complete) {
      colDataAppend(output->columnData, i, (char*) &value, false);
    }
D
dapan1121 已提交
492 493
  }

D
dapan1121 已提交
494 495 496 497 498
  if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
    sclFreeParam(output);
    output->numOfRows = 0;
  }

D
dapan1121 已提交
499
_return:
D
dapan1121 已提交
500

D
dapan 已提交
501
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
502
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
503 504
  }

wafwerar's avatar
wafwerar 已提交
505
  taosMemoryFreeClear(params);
D
dapan1121 已提交
506
  SCL_RET(code);
D
dapan1121 已提交
507 508 509 510 511 512
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
513

D
dapan1121 已提交
514
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
515 516 517
  output->columnData = createColumnInfoData(&node->node.resType, rowNum);
  if (output->columnData == NULL) {
    sclError("calloc failed, size:%d", (int32_t)rowNum * node->node.resType.bytes);
D
dapan1121 已提交
518 519 520 521 522
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

D
dapan1121 已提交
523
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
524 525
  SScalarParam* pLeft = &params[0];
  SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
526

D
dapan 已提交
527
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
D
dapan1121 已提交
528 529

_return:
D
dapan1121 已提交
530
  for (int32_t i = 0; i < paramNum; ++i) {
531
//    sclFreeParam(&params[i]);
D
dapan1121 已提交
532 533
  }

wafwerar's avatar
wafwerar 已提交
534
  taosMemoryFreeClear(params);
D
dapan1121 已提交
535
  SCL_RET(code);
D
dapan1121 已提交
536 537
}

D
dapan1121 已提交
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) {
  if (opType <= OP_TYPE_CALC_MAX) {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  } else {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_BOOL;
D
dapan1121 已提交
560
    res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
D
dapan1121 已提交
561 562 563 564 565 566 567 568 569 570
    res->datum.b = false;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  }

  return DEAL_RES_CONTINUE;
}


D
dapan 已提交
571
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
572 573 574 575
  SOperatorNode *node = (SOperatorNode *)*pNode;

  if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
    SValueNode *valueNode = (SValueNode *)node->pLeft;
D
dapan1121 已提交
576
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
D
dapan1121 已提交
577 578
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
D
dapan 已提交
579 580 581 582 583

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) 
      && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode);
    }
D
dapan1121 已提交
584 585 586 587
  }

  if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
    SValueNode *valueNode = (SValueNode *)node->pRight;
D
dapan1121 已提交
588
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
D
dapan1121 已提交
589 590
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
D
dapan 已提交
591 592 593 594 595

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) 
      && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode);
    }
D
dapan1121 已提交
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
  }

  if (node->pRight && (QUERY_NODE_NODE_LIST == nodeType(node->pRight))) {
    SNodeListNode *listNode = (SNodeListNode *)node->pRight;
    SNode* tnode = NULL;
    WHERE_EACH(tnode, listNode->pNodeList) {
      if (SCL_IS_NULL_VALUE_NODE(tnode)) {
        if (node->opType == OP_TYPE_IN) {
          ERASE_NODE(listNode->pNodeList);
          continue;
        } else { //OP_TYPE_NOT_IN
          return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
        }
      }

      WHERE_NEXT;
    }

    if (listNode->pNodeList->length <= 0) {
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
622
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
623
  SFunctionNode *node = (SFunctionNode *)*pNode;
D
dapan1121 已提交
624
  SNode* tnode = NULL;
625
  if (!fmIsScalarFunc(node->funcId)) {
D
dapan1121 已提交
626 627
    return DEAL_RES_CONTINUE;
  }
628

D
dapan1121 已提交
629 630 631 632 633 634
  FOREACH(tnode, node->pParameterList) {
    if (!SCL_IS_CONST_NODE(tnode)) {
      return DEAL_RES_CONTINUE;
    }
  }

D
dapan1121 已提交
635
  SScalarParam output = {0};
636

637
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan 已提交
638
  if (ctx->code) {
D
dapan1121 已提交
639 640 641
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
642
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
643 644
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
645
    sclFreeParam(&output);
D
dapan 已提交
646
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
647 648 649
    return DEAL_RES_ERROR;
  }

650 651
  res->translate = true;

652 653
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
654
  } else {
D
dapan1121 已提交
655 656 657 658
    res->node.resType.type = output.columnData->info.type;
    res->node.resType.bytes = output.columnData->info.bytes;
    res->node.resType.scale = output.columnData->info.scale;
    res->node.resType.precision = output.columnData->info.precision;
659 660
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {
661
      res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
662
      memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
663
    } else {
D
dapan1121 已提交
664
      nodesSetValueNodeValue(res, output.columnData->pData);
665
    }
D
dapan1121 已提交
666
  }
667

D
dapan1121 已提交
668 669 670
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
671
  sclFreeParam(&output);
D
dapan1121 已提交
672 673 674
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
675
EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
676 677
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

H
Haojun Liao 已提交
678
  SScalarParam output = {0};
D
dapan 已提交
679 680
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
681 682 683
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
684 685 686 687
  if (0 == output.numOfRows) {
    return DEAL_RES_CONTINUE;
  }

D
dapan1121 已提交
688
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
689 690
  if (NULL == res) {
    sclError("make value node failed");
691
    sclFreeParam(&output);
D
dapan 已提交
692
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
693 694 695 696
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;
697
  res->translate = true;
D
dapan1121 已提交
698

699 700 701 702
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
703
  } else {
D
dapan1121 已提交
704
    nodesSetValueNodeValue(res, output.columnData->pData);
D
dapan1121 已提交
705
  }
D
dapan1121 已提交
706 707 708 709

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
710
  sclFreeParam(&output);
D
dapan1121 已提交
711 712 713
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
714
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
715
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
716

D
dapan1121 已提交
717
  if ((!SCL_IS_CONST_NODE(node->pLeft)) || (!SCL_IS_CONST_NODE(node->pRight))) {
D
dapan 已提交
718
    return sclRewriteNonConstOperator(pNode, ctx);
D
dapan1121 已提交
719 720
  }

H
Haojun Liao 已提交
721
  SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
D
dapan 已提交
722 723
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
724 725 726
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
727
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
728
  if (NULL == res) {
729 730
    sclError("make value node failed");
    sclFreeParam(&output);
D
dapan 已提交
731
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
732 733 734
    return DEAL_RES_ERROR;
  }

735
  res->translate = true;
D
dapan1121 已提交
736

737 738
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
739
  } else {
740 741 742 743 744 745
    res->node.resType = node->node.resType;
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
      res->datum.p = output.columnData->pData;
      output.columnData->pData = NULL;
    } else {
D
dapan1121 已提交
746
      nodesSetValueNodeValue(res, output.columnData->pData);    
747
    }
D
dapan1121 已提交
748
  }
D
dapan1121 已提交
749 750 751 752

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

H
Haojun Liao 已提交
753
  sclFreeParam(&output);
D
dapan1121 已提交
754 755 756 757
  return DEAL_RES_CONTINUE;
}

EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
D
dapan 已提交
758 759
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
760
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
761
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
762 763 764
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
765
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
766 767
  }

D
dapan1121 已提交
768
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
D
dapan 已提交
769
    return sclRewriteOperator(pNode, ctx);
770
  }
771 772

  return DEAL_RES_CONTINUE;
D
dapan1121 已提交
773 774
}

D
dapan 已提交
775
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
776
  SFunctionNode *node = (SFunctionNode *)pNode;
D
dapan1121 已提交
777
  SScalarParam output = {0};
778

779
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan1121 已提交
780 781
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
782 783
  }

D
dapan1121 已提交
784
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
785 786
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
787 788
  }

D
dapan1121 已提交
789 790 791
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
792
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
793
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
D
dapan1121 已提交
794
  SScalarParam output = {0};
795

D
dapan1121 已提交
796 797 798
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
799 800
  }

D
dapan1121 已提交
801
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
802
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
803
    return DEAL_RES_ERROR;
D
dapan1121 已提交
804 805 806 807 808
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
809
EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
810
  SOperatorNode *node = (SOperatorNode *)pNode;
D
dapan1121 已提交
811
  SScalarParam output = {0};
D
dapan1121 已提交
812
  
D
dapan1121 已提交
813 814
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
815 816
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
817

D
dapan1121 已提交
818
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
819
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
820 821 822
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
823 824 825
  return DEAL_RES_CONTINUE;
}

826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
  STargetNode *target = (STargetNode *)pNode;

  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  int32_t index = -1;
  for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
    SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
    if (pb->info.blockId == target->dataBlockId) {
      index = i;
      break;
    }
  }

  if (index == -1) {
    sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);

  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  // if block->pDataBlock is not enough, there are problems if target->slotId bigger than the size of block->pDataBlock,
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);

  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }

  colDataAssign(col, res->columnData, res->numOfRows);
  block->info.rows = res->numOfRows;

  sclFreeParam(res);
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  return DEAL_RES_CONTINUE;
}
D
dapan 已提交
875

D
dapan1121 已提交
876
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
X
Xiaoyu Wang 已提交
877
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) {
D
dapan1121 已提交
878
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
879
  }
D
dapan 已提交
880 881

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
882
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
883
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
884
  }
D
dapan1121 已提交
885

D
dapan1121 已提交
886
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
887
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
888
  }
D
dapan1121 已提交
889

D
dapan1121 已提交
890
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
D
dapan 已提交
891
    return sclWalkOperator(pNode, ctx);
D
dapan1121 已提交
892
  }
D
dapan1121 已提交
893

894 895 896
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
897

D
dapan 已提交
898
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
899 900
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
901 902 903 904 905 906 907 908
}

int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
909
  SScalarCtx ctx = {0};
910
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan 已提交
911
  if (NULL == ctx.pRes) {
S
slzhou 已提交
912 913 914 915 916 917
    sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  if (NULL == ctx.udf2Handle) {
    sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan 已提交
918 919
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
X
Xiaoyu Wang 已提交
920
  nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
D
dapan 已提交
921
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
922 923
  *pRes = pNode;

D
dapan 已提交
924
_return:
S
slzhou 已提交
925
  sclFreeUdfHandles(ctx.udf2Handle);
D
dapan 已提交
926 927
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
928 929
}

D
dapan 已提交
930
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
D
dapan1121 已提交
931
  if (NULL == pNode || NULL == pBlockList) {
D
dapan1121 已提交
932 933 934 935
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
936
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param};
937

938
  // TODO: OPT performance
939
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan1121 已提交
940
  if (NULL == ctx.pRes) {
S
slzhou 已提交
941 942 943 944 945 946
    sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
  ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  if (NULL == ctx.udf2Handle) {
    sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan1121 已提交
947 948
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
X
Xiaoyu Wang 已提交
949
  nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
D
dapan 已提交
950
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
951

D
dapan1121 已提交
952 953 954 955 956 957 958
  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
    
959 960
    colDataAssign(pDst->columnData, res->columnData, res->numOfRows);
    pDst->numOfRows = res->numOfRows;
D
dapan1121 已提交
961
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
D
dapan1121 已提交
962
  }
D
dapan1121 已提交
963

D
dapan 已提交
964
_return:
D
dapan1121 已提交
965
  //nodesDestroyNode(pNode);
S
slzhou 已提交
966
  sclFreeUdfHandles(ctx.udf2Handle);
D
dapan 已提交
967 968
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
969
}