scalar.c 28.2 KB
Newer Older
D
dapan1121 已提交
1 2
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
3 4
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
5
#include "sclInt.h"
H
Haojun Liao 已提交
6 7 8
#include "sclvector.h"
#include "tcommon.h"
#include "tdatablock.h"
9
#include "scalar.h"
S
slzhou 已提交
10
#include "tudf.h"
D
fix bug  
dapan1121 已提交
11
#include "ttime.h"
D
dapan1121 已提交
12

D
dapan1121 已提交
13
int32_t scalarGetOperatorParamNum(EOperatorType type) {
D
dapan1121 已提交
14
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type 
D
dapan1121 已提交
15 16
   || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type
   || OP_TYPE_MINUS == type) {
D
dapan1121 已提交
17 18 19 20 21 22
    return 1;
  }

  return 2;
}

D
dapan 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35
void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
  char *timeStr = valueNode->datum.p;
  if (convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i) !=
      TSDB_CODE_SUCCESS) {
    valueNode->datum.i = 0;
  }
  taosMemoryFree(timeStr);
  
  valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
  valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
}


36
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
H
Haojun Liao 已提交
37
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
38 39 40 41 42 43 44 45 46 47
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pColumnData->info.type      = pType->type;
  pColumnData->info.bytes     = pType->bytes;
  pColumnData->info.scale     = pType->scale;
  pColumnData->info.precision = pType->precision;

48
  int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows);
49 50
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
51
    taosMemoryFree(pColumnData);
52 53 54 55 56 57
    return NULL;
  } else {
    return pColumnData;
  }
}

58 59 60 61
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
  SScalarParam in = {.numOfRows = 1};
  in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
  colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
62

63
  colInfoDataEnsureCapacity(out->columnData, 0, 1);
64 65 66 67
  int32_t code = vectorConvertImpl(&in, out);
  sclFreeParam(&in);

  return code;
68 69
}

D
dapan1121 已提交
70 71 72 73 74 75 76 77 78 79 80 81
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); 

  int32_t code = 0;
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
  SListCell *cell = nodeList->pNodeList->pHead;
H
Haojun Liao 已提交
82
  SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
83

D
dapan1121 已提交
84 85 86 87 88
  int32_t len = 0;
  void *buf = NULL;
  
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;
D
dapan1121 已提交
89
    
D
dapan1121 已提交
90
    if (valueNode->node.resType.type != type) {
91
      out.columnData->info.type = type;
D
dapan1121 已提交
92 93 94 95 96 97 98 99 100
      if (IS_VAR_DATA_TYPE(type)) {
        if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
          out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE;
        } else {
          out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE;
        }
      } else {
        out.columnData->info.bytes = tDataTypes[type].bytes;
      }
101

102 103
      code = doConvertDataType(valueNode, &out);
      if (code != TSDB_CODE_SUCCESS) {
104
//        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
105 106 107 108
        SCL_ERR_JRET(code);
      }

      if (IS_VAR_DATA_TYPE(type)) {
D
dapan1121 已提交
109 110 111
        char* data = colDataGetVarData(out.columnData, 0);
        len = varDataLen(data);
        buf = varDataVal(data);
D
dapan1121 已提交
112 113
      } else {
        len = tDataTypes[type].bytes;
114
        buf = out.columnData->pData;
D
dapan1121 已提交
115 116 117
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
118 119 120 121 122
      if (IS_VAR_DATA_TYPE(type)) {
        len = varDataLen(buf);
        buf = varDataVal(buf);
      } else {
        len = valueNode->node.resType.bytes;
123
      }
D
dapan1121 已提交
124 125
    }
    
126
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
127
      sclError("taosHashPut to set failed");
D
dapan1121 已提交
128 129
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
130 131

    cell = cell->pNext;
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141
  }

  *data = pObj;
  return TSDB_CODE_SUCCESS;

_return:
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
142 143 144 145 146 147 148
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
  void *pIter = taosHashIterate(res, NULL);
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
149
      sclFreeParam(p);
D
dapan1121 已提交
150 151 152 153 154 155
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

D
dapan1121 已提交
156
void sclFreeParam(SScalarParam *param) {
157 158
  if (param->columnData != NULL) {
    colDataDestroy(param->columnData);
wmmhello's avatar
wmmhello 已提交
159
    taosMemoryFree(param->columnData);
160 161 162 163 164
  }

  if (param->pHashFilter != NULL) {
    taosHashCleanup(param->pHashFilter);
  }
D
dapan1121 已提交
165 166
}

D
dapan1121 已提交
167 168 169 170 171
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
  
wafwerar's avatar
wafwerar 已提交
172
  *res = taosMemoryMalloc(pNode->node.resType.bytes);
D
dapan1121 已提交
173 174 175 176 177 178 179 180 181
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
182 183 184 185
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
  switch (nodeType(node)) {
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
186 187 188 189

      param->numOfRows = 1;
      param->columnData = createColumnInfoData(&valueNode->node.resType, 1);
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type) {
190
        colDataAppendNULL(param->columnData, 0);
191 192
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
193
      }
D
dapan1121 已提交
194 195
      break;
    }
D
dapan1121 已提交
196 197
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
198 199
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
200 201 202
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan1121 已提交
203
      SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, nodeList->dataType.type));
D
dapan 已提交
204
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
205
        taosHashCleanup(param->pHashFilter);
D
dapan 已提交
206 207 208
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }   
D
dapan1121 已提交
209 210
      break;
    }
X
Xiaoyu Wang 已提交
211
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
212 213
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
214 215 216
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      
X
Xiaoyu Wang 已提交
217
      SColumnNode *ref = (SColumnNode *)node;
218 219 220 221 222 223 224 225 226 227 228

      int32_t index = -1;
      for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
        SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
        if (pb->info.blockId == ref->dataBlockId) {
          index = i;
          break;
        }
      }

      if (index == -1) {
D
dapan1121 已提交
229
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
230 231 232
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

233 234
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
      if (NULL == block || ref->slotId >= block->info.numOfCols) {
D
dapan 已提交
235
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
236 237 238
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
239
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
240 241
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
242 243
      break;
    }
244 245 246
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
D
dapan1121 已提交
247 248 249 250 251 252 253 254
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
      break;
    }
255 256
    default:
      break;
D
dapan1121 已提交
257 258
  }

259 260 261
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
262 263 264
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
    
265
    *rowNum = param->numOfRows;
D
dapan1121 已提交
266 267
  }

268
  param->param = ctx->param;
D
dapan1121 已提交
269 270 271
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
272
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) {  
D
dapan1121 已提交
273
  int32_t code = 0;
D
dapan1121 已提交
274 275 276 277 278 279 280 281
  if (NULL == pParamList) {
    if (ctx->pBlockList) {
      SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0);
      *rowNum = pBlock->info.rows;
    } else {
      *rowNum = 1;
    }

D
dapan 已提交
282
    *paramNum = 1;
D
dapan1121 已提交
283
  } else {
D
dapan 已提交
284
    *paramNum = pParamList->length;
D
dapan1121 已提交
285 286
  }

D
dapan 已提交
287
  SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
288
  if (NULL == paramList) {
D
dapan 已提交
289
    sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
D
dapan1121 已提交
290
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
291 292
  }

D
dapan1121 已提交
293 294 295 296 297 298
  if (pParamList) {
    SNode *tnode = NULL;
    int32_t i = 0;
    if (SCL_IS_CONST_CALC(ctx)) {
      WHERE_EACH (tnode, pParamList) { 
        if (!SCL_IS_CONST_NODE(tnode)) {
D
dapan 已提交
299
          WHERE_NEXT;
D
dapan1121 已提交
300 301 302 303 304 305 306 307 308 309 310 311
        } else {
          SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
          ERASE_NODE(pParamList);
        }
        
        ++i;
      }
    } else {
      FOREACH(tnode, pParamList) { 
        SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
        ++i;
      }
D
dapan1121 已提交
312
    }
D
dapan1121 已提交
313 314 315
  } else {
    paramList[0].numOfRows = *rowNum;
  }
D
dapan1121 已提交
316

D
dapan1121 已提交
317 318
  if (0 == *rowNum) {
    taosMemoryFreeClear(paramList);    
D
dapan1121 已提交
319
  }
D
dapan1121 已提交
320

D
dapan1121 已提交
321
  *pParams = paramList;
D
dapan1121 已提交
322
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
323

D
dapan1121 已提交
324
_return:
wafwerar's avatar
wafwerar 已提交
325
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
326 327 328 329 330
  SCL_RET(code);
}

int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
331
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
332 333 334 335 336
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
wafwerar's avatar
wafwerar 已提交
337
  SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
338 339
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
340 341 342
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
343
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
344
  if (paramNum > 1) {
D
dapan1121 已提交
345
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
346 347
  }

D
dapan1121 已提交
348
  *pParams = paramList;
D
dapan1121 已提交
349
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
350 351

_return:
wafwerar's avatar
wafwerar 已提交
352
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
353
  SCL_RET(code);
D
dapan1121 已提交
354 355
}

356
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
357 358
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
359
  int32_t paramNum = 0;
D
dapan1121 已提交
360
  int32_t code = 0;
D
dapan 已提交
361
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
362

D
dapan1121 已提交
363 364 365
  if (fmIsUserDefinedFunc(node->funcId)) {
    UdfcFuncHandle udfHandle = NULL;
    
366 367 368 369 370
    code = setupUdf(node->functionName, &udfHandle);
    if (code != 0) {
      sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code);
      goto _return;
    }
D
dapan1121 已提交
371
    code = callUdfScalarFunc(udfHandle, params, paramNum, output);
372 373 374 375 376 377 378 379 380
    if (code != 0) {
      sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
      goto _return;
    }
    code = teardownUdf(udfHandle);
    if (code != 0) {
      sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
      goto _return;
    }
D
dapan1121 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
  } else {
    SScalarFuncExecFuncs ffpSet = {0};
    code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
    if (code) {
      sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
  
    output->columnData = createColumnInfoData(&node->node.resType, rowNum);
    if (output->columnData == NULL) {
      sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    code = (*ffpSet.process)(params, paramNum, output);
    if (code) {
      sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
400 401 402 403
  }

_return:

D
dapan 已提交
404
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
405
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
406 407
  }

wafwerar's avatar
wafwerar 已提交
408
  taosMemoryFreeClear(params);
D
dapan1121 已提交
409 410 411 412 413 414 415 416 417 418 419 420
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
421 422
  }

D
dapan1121 已提交
423 424 425 426 427 428 429
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
430
  int32_t paramNum = 0;
D
dapan1121 已提交
431
  int32_t code = 0;
D
dapan 已提交
432
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
433 434 435 436
  if (NULL == params) {
    output->numOfRows = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
437

438 439 440 441 442 443
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
  output->columnData = createColumnInfoData(&t, rowNum);
  if (output->columnData == NULL) {
D
dapan1121 已提交
444
    sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool)));
D
dapan1121 已提交
445 446
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
447

D
dapan1121 已提交
448
  bool value = false;
D
dapan 已提交
449
  bool complete = true;
D
dapan1121 已提交
450
  for (int32_t i = 0; i < rowNum; ++i) {
D
dapan 已提交
451 452
    complete = true;
    for (int32_t m = 0; m < paramNum; ++m) {
D
dapan1121 已提交
453
      if (NULL == params[m].columnData) {
D
dapan 已提交
454
        complete = false;
D
dapan1121 已提交
455 456
        continue;
      }
457 458 459
      char* p = colDataGetData(params[m].columnData, i);
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
460
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
D
dapan1121 已提交
461
        complete = true;
D
dapan1121 已提交
462 463
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
D
dapan1121 已提交
464
        complete = true;
D
dapan1121 已提交
465 466 467 468 469 470
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

D
dapan 已提交
471 472 473
    if (complete) {
      colDataAppend(output->columnData, i, (char*) &value, false);
    }
D
dapan1121 已提交
474 475
  }

D
dapan1121 已提交
476 477 478 479 480
  if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
    sclFreeParam(output);
    output->numOfRows = 0;
  }

D
dapan1121 已提交
481
_return:
D
dapan1121 已提交
482

D
dapan 已提交
483
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
484
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
485 486
  }

wafwerar's avatar
wafwerar 已提交
487
  taosMemoryFreeClear(params);
D
dapan1121 已提交
488
  SCL_RET(code);
D
dapan1121 已提交
489 490 491 492 493 494
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
495

D
dapan1121 已提交
496
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
497 498 499
  output->columnData = createColumnInfoData(&node->node.resType, rowNum);
  if (output->columnData == NULL) {
    sclError("calloc failed, size:%d", (int32_t)rowNum * node->node.resType.bytes);
D
dapan1121 已提交
500 501 502 503 504
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

D
dapan1121 已提交
505
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
506 507
  SScalarParam* pLeft = &params[0];
  SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
508

D
dapan 已提交
509
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
D
dapan1121 已提交
510 511

_return:
D
dapan1121 已提交
512
  for (int32_t i = 0; i < paramNum; ++i) {
513
//    sclFreeParam(&params[i]);
D
dapan1121 已提交
514 515
  }

wafwerar's avatar
wafwerar 已提交
516
  taosMemoryFreeClear(params);
D
dapan1121 已提交
517
  SCL_RET(code);
D
dapan1121 已提交
518 519
}

D
dapan1121 已提交
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) {
  if (opType <= OP_TYPE_CALC_MAX) {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  } else {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_BOOL;
D
dapan1121 已提交
542
    res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
D
dapan1121 已提交
543 544 545 546 547 548 549 550 551 552
    res->datum.b = false;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  }

  return DEAL_RES_CONTINUE;
}


D
dapan 已提交
553
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
554 555 556 557
  SOperatorNode *node = (SOperatorNode *)*pNode;

  if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
    SValueNode *valueNode = (SValueNode *)node->pLeft;
D
dapan1121 已提交
558
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
D
dapan1121 已提交
559 560
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
D
dapan 已提交
561 562 563 564 565

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) 
      && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode);
    }
D
dapan1121 已提交
566 567 568 569
  }

  if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
    SValueNode *valueNode = (SValueNode *)node->pRight;
D
dapan1121 已提交
570
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
D
dapan1121 已提交
571 572
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
D
dapan 已提交
573 574 575 576 577

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) 
      && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode);
    }
D
dapan1121 已提交
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603
  }

  if (node->pRight && (QUERY_NODE_NODE_LIST == nodeType(node->pRight))) {
    SNodeListNode *listNode = (SNodeListNode *)node->pRight;
    SNode* tnode = NULL;
    WHERE_EACH(tnode, listNode->pNodeList) {
      if (SCL_IS_NULL_VALUE_NODE(tnode)) {
        if (node->opType == OP_TYPE_IN) {
          ERASE_NODE(listNode->pNodeList);
          continue;
        } else { //OP_TYPE_NOT_IN
          return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
        }
      }

      WHERE_NEXT;
    }

    if (listNode->pNodeList->length <= 0) {
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
604
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
605
  SFunctionNode *node = (SFunctionNode *)*pNode;
D
dapan1121 已提交
606
  SNode* tnode = NULL;
607
  if (!fmIsScalarFunc(node->funcId)) {
D
dapan1121 已提交
608 609
    return DEAL_RES_CONTINUE;
  }
610

D
dapan1121 已提交
611 612 613 614 615 616
  FOREACH(tnode, node->pParameterList) {
    if (!SCL_IS_CONST_NODE(tnode)) {
      return DEAL_RES_CONTINUE;
    }
  }

D
dapan1121 已提交
617
  SScalarParam output = {0};
618

619
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan 已提交
620
  if (ctx->code) {
D
dapan1121 已提交
621 622 623
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
624
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
625 626
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
627
    sclFreeParam(&output);
D
dapan 已提交
628
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
629 630 631
    return DEAL_RES_ERROR;
  }

632 633
  res->translate = true;

634 635
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
636
  } else {
D
dapan1121 已提交
637 638 639 640
    res->node.resType.type = output.columnData->info.type;
    res->node.resType.bytes = output.columnData->info.bytes;
    res->node.resType.scale = output.columnData->info.scale;
    res->node.resType.precision = output.columnData->info.precision;
641 642
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {
643
      res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
644
      memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
645
    } else {
D
dapan1121 已提交
646
      nodesSetValueNodeValue(res, output.columnData->pData);
647
    }
D
dapan1121 已提交
648
  }
649

D
dapan1121 已提交
650 651 652
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
653
  sclFreeParam(&output);
D
dapan1121 已提交
654 655 656
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
657
EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
658 659
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

H
Haojun Liao 已提交
660
  SScalarParam output = {0};
D
dapan 已提交
661 662
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
663 664 665
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
666 667 668 669
  if (0 == output.numOfRows) {
    return DEAL_RES_CONTINUE;
  }

D
dapan1121 已提交
670
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
671 672
  if (NULL == res) {
    sclError("make value node failed");
673
    sclFreeParam(&output);
D
dapan 已提交
674
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
675 676 677 678
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;
679
  res->translate = true;
D
dapan1121 已提交
680

681 682 683 684
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
685
  } else {
D
dapan1121 已提交
686
    nodesSetValueNodeValue(res, output.columnData->pData);
D
dapan1121 已提交
687
  }
D
dapan1121 已提交
688 689 690 691

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
692
  sclFreeParam(&output);
D
dapan1121 已提交
693 694 695
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
696
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
697
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
698

D
dapan1121 已提交
699
  if ((!SCL_IS_CONST_NODE(node->pLeft)) || (!SCL_IS_CONST_NODE(node->pRight))) {
D
dapan 已提交
700
    return sclRewriteNonConstOperator(pNode, ctx);
D
dapan1121 已提交
701 702
  }

H
Haojun Liao 已提交
703
  SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
D
dapan 已提交
704 705
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
706 707 708
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
709
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
710
  if (NULL == res) {
711 712
    sclError("make value node failed");
    sclFreeParam(&output);
D
dapan 已提交
713
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
714 715 716
    return DEAL_RES_ERROR;
  }

717
  res->translate = true;
D
dapan1121 已提交
718

719 720
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
721
  } else {
722 723 724 725 726 727
    res->node.resType = node->node.resType;
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
      res->datum.p = output.columnData->pData;
      output.columnData->pData = NULL;
    } else {
D
dapan1121 已提交
728
      nodesSetValueNodeValue(res, output.columnData->pData);    
729
    }
D
dapan1121 已提交
730
  }
D
dapan1121 已提交
731 732 733 734

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

H
Haojun Liao 已提交
735
  sclFreeParam(&output);
D
dapan1121 已提交
736 737 738 739
  return DEAL_RES_CONTINUE;
}

EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
D
dapan 已提交
740 741
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
742
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
743
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
744 745 746
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
747
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
748 749
  }

D
dapan1121 已提交
750
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
D
dapan 已提交
751
    return sclRewriteOperator(pNode, ctx);
752
  }
753 754

  return DEAL_RES_CONTINUE;
D
dapan1121 已提交
755 756
}

D
dapan 已提交
757
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
758
  SFunctionNode *node = (SFunctionNode *)pNode;
D
dapan1121 已提交
759
  SScalarParam output = {0};
760

761
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan1121 已提交
762 763
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
764 765
  }

D
dapan1121 已提交
766
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
767 768
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
769 770
  }

D
dapan1121 已提交
771 772 773
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
774
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
775
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
D
dapan1121 已提交
776
  SScalarParam output = {0};
777

D
dapan1121 已提交
778 779 780
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
781 782
  }

D
dapan1121 已提交
783
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
784
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
785
    return DEAL_RES_ERROR;
D
dapan1121 已提交
786 787 788 789 790
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
791
EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
792
  SOperatorNode *node = (SOperatorNode *)pNode;
D
dapan1121 已提交
793
  SScalarParam output = {0};
D
dapan1121 已提交
794
  
D
dapan1121 已提交
795 796
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
797 798
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
799

D
dapan1121 已提交
800
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
801
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
802 803 804
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
805 806 807
  return DEAL_RES_CONTINUE;
}

808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
  STargetNode *target = (STargetNode *)pNode;

  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  int32_t index = -1;
  for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
    SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
    if (pb->info.blockId == target->dataBlockId) {
      index = i;
      break;
    }
  }

  if (index == -1) {
    sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);

  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  // if block->pDataBlock is not enough, there are problems if target->slotId bigger than the size of block->pDataBlock,
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);

  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }

  colDataAssign(col, res->columnData, res->numOfRows);
  block->info.rows = res->numOfRows;

  sclFreeParam(res);
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  return DEAL_RES_CONTINUE;
}
D
dapan 已提交
857

D
dapan1121 已提交
858
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
X
Xiaoyu Wang 已提交
859
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) {
D
dapan1121 已提交
860
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
861
  }
D
dapan 已提交
862 863

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
864
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
865
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
866
  }
D
dapan1121 已提交
867

D
dapan1121 已提交
868
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
869
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
870
  }
D
dapan1121 已提交
871

D
dapan1121 已提交
872
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
D
dapan 已提交
873
    return sclWalkOperator(pNode, ctx);
D
dapan1121 已提交
874
  }
D
dapan1121 已提交
875

876 877 878
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
879

D
dapan 已提交
880
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
881 882
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
883 884 885 886 887 888 889 890
}

int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
891
  SScalarCtx ctx = {0};
892
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan 已提交
893 894 895 896
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
897
  
X
Xiaoyu Wang 已提交
898
  nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
D
dapan 已提交
899
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
900 901
  *pRes = pNode;

D
dapan 已提交
902 903 904
_return:
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
905 906
}

D
dapan 已提交
907
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
D
dapan1121 已提交
908
  if (NULL == pNode || NULL == pBlockList) {
D
dapan1121 已提交
909 910 911 912
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
913
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param};
914

915
  // TODO: OPT performance
916
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan1121 已提交
917 918 919 920
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
921
  
X
Xiaoyu Wang 已提交
922
  nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
D
dapan 已提交
923
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
924

D
dapan1121 已提交
925 926 927 928 929 930 931
  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
    
932 933
    colDataAssign(pDst->columnData, res->columnData, res->numOfRows);
    pDst->numOfRows = res->numOfRows;
D
dapan1121 已提交
934
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
D
dapan1121 已提交
935
  }
D
dapan1121 已提交
936

D
dapan 已提交
937
_return:
D
dapan1121 已提交
938
  //nodesDestroyNode(pNode);
D
dapan 已提交
939 940
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
941
}