scalar.c 28.1 KB
Newer Older
D
dapan1121 已提交
1 2
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
3 4
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
5
#include "sclInt.h"
H
Haojun Liao 已提交
6 7 8
#include "sclvector.h"
#include "tcommon.h"
#include "tdatablock.h"
9
#include "scalar.h"
S
slzhou 已提交
10
#include "tudf.h"
D
fix bug  
dapan1121 已提交
11
#include "ttime.h"
D
dapan1121 已提交
12

D
dapan1121 已提交
13
int32_t scalarGetOperatorParamNum(EOperatorType type) {
D
dapan1121 已提交
14
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type 
D
dapan1121 已提交
15 16
   || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type
   || OP_TYPE_MINUS == type) {
D
dapan1121 已提交
17 18 19 20 21 22
    return 1;
  }

  return 2;
}

D
dapan 已提交
23 24 25 26 27 28 29
void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
  char *timeStr = valueNode->datum.p;
  if (convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i) !=
      TSDB_CODE_SUCCESS) {
    valueNode->datum.i = 0;
  }
  taosMemoryFree(timeStr);
D
dapan1121 已提交
30
  valueNode->typeData = valueNode->datum.i;
D
dapan 已提交
31 32 33 34 35 36
  
  valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
  valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
}


37
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
H
Haojun Liao 已提交
38
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
39 40 41 42 43 44 45 46 47 48
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pColumnData->info.type      = pType->type;
  pColumnData->info.bytes     = pType->bytes;
  pColumnData->info.scale     = pType->scale;
  pColumnData->info.precision = pType->precision;

49
  int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows);
50 51
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
52
    taosMemoryFree(pColumnData);
53 54 55 56 57 58
    return NULL;
  } else {
    return pColumnData;
  }
}

59 60 61 62
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
  SScalarParam in = {.numOfRows = 1};
  in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
  colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
63

64
  colInfoDataEnsureCapacity(out->columnData, 0, 1);
65 66 67 68
  int32_t code = vectorConvertImpl(&in, out);
  sclFreeParam(&in);

  return code;
69 70
}

D
dapan1121 已提交
71 72 73 74 75 76 77 78 79 80 81 82
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); 

  int32_t code = 0;
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
  SListCell *cell = nodeList->pNodeList->pHead;
H
Haojun Liao 已提交
83
  SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
84

D
dapan1121 已提交
85 86 87 88 89
  int32_t len = 0;
  void *buf = NULL;
  
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;
D
dapan1121 已提交
90
    
D
dapan1121 已提交
91
    if (valueNode->node.resType.type != type) {
92
      out.columnData->info.type = type;
D
dapan1121 已提交
93 94 95 96 97 98 99 100 101
      if (IS_VAR_DATA_TYPE(type)) {
        if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
          out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE;
        } else {
          out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE;
        }
      } else {
        out.columnData->info.bytes = tDataTypes[type].bytes;
      }
102

103 104
      code = doConvertDataType(valueNode, &out);
      if (code != TSDB_CODE_SUCCESS) {
105
//        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
106 107 108 109
        SCL_ERR_JRET(code);
      }

      if (IS_VAR_DATA_TYPE(type)) {
D
dapan1121 已提交
110 111 112
        char* data = colDataGetVarData(out.columnData, 0);
        len = varDataLen(data);
        buf = varDataVal(data);
D
dapan1121 已提交
113 114
      } else {
        len = tDataTypes[type].bytes;
115
        buf = out.columnData->pData;
D
dapan1121 已提交
116 117 118
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
119 120 121 122 123
      if (IS_VAR_DATA_TYPE(type)) {
        len = varDataLen(buf);
        buf = varDataVal(buf);
      } else {
        len = valueNode->node.resType.bytes;
124
      }
D
dapan1121 已提交
125 126
    }
    
127
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
128
      sclError("taosHashPut to set failed");
D
dapan1121 已提交
129 130
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
131 132

    cell = cell->pNext;
D
dapan1121 已提交
133 134 135 136 137 138 139 140 141 142
  }

  *data = pObj;
  return TSDB_CODE_SUCCESS;

_return:
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
143 144 145 146 147 148 149
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
  void *pIter = taosHashIterate(res, NULL);
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
150
      sclFreeParam(p);
D
dapan1121 已提交
151 152 153 154 155 156
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

D
dapan1121 已提交
157
void sclFreeParam(SScalarParam *param) {
158 159
  if (param->columnData != NULL) {
    colDataDestroy(param->columnData);
wmmhello's avatar
wmmhello 已提交
160
    taosMemoryFree(param->columnData);
161 162 163 164 165
  }

  if (param->pHashFilter != NULL) {
    taosHashCleanup(param->pHashFilter);
  }
D
dapan1121 已提交
166 167
}

D
dapan1121 已提交
168 169 170 171 172
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
  
wafwerar's avatar
wafwerar 已提交
173
  *res = taosMemoryMalloc(pNode->node.resType.bytes);
D
dapan1121 已提交
174 175 176 177 178 179 180 181 182
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
183 184
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
  switch (nodeType(node)) {
D
dapan1121 已提交
185 186 187 188 189
    case QUERY_NODE_LEFT_VALUE: {
      SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, 0);
      param->numOfRows = pb->info.rows;
      break;
    }
D
dapan1121 已提交
190 191
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
192 193 194 195

      param->numOfRows = 1;
      param->columnData = createColumnInfoData(&valueNode->node.resType, 1);
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type) {
196
        colDataAppendNULL(param->columnData, 0);
197 198
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
199
      }
D
dapan1121 已提交
200 201
      break;
    }
D
dapan1121 已提交
202 203
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
204 205
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
206 207 208
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan1121 已提交
209
      SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, nodeList->dataType.type));
D
dapan 已提交
210
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
211
        taosHashCleanup(param->pHashFilter);
D
dapan 已提交
212 213 214
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }   
D
dapan1121 已提交
215 216
      break;
    }
X
Xiaoyu Wang 已提交
217
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
218 219
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
220 221 222
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      
X
Xiaoyu Wang 已提交
223
      SColumnNode *ref = (SColumnNode *)node;
224 225 226 227 228 229 230 231 232 233 234

      int32_t index = -1;
      for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
        SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
        if (pb->info.blockId == ref->dataBlockId) {
          index = i;
          break;
        }
      }

      if (index == -1) {
D
dapan1121 已提交
235
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
236 237 238
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

239 240
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
      if (NULL == block || ref->slotId >= block->info.numOfCols) {
D
dapan 已提交
241
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
242 243 244
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
245
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
246 247
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
248 249
      break;
    }
250 251 252
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
D
dapan1121 已提交
253 254 255 256 257 258 259 260
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
      break;
    }
261 262
    default:
      break;
D
dapan1121 已提交
263 264
  }

265 266 267
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
268 269 270
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
    
271
    *rowNum = param->numOfRows;
D
dapan1121 已提交
272 273
  }

274
  param->param = ctx->param;
D
dapan1121 已提交
275 276 277
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
278
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) {  
D
dapan1121 已提交
279
  int32_t code = 0;
D
dapan1121 已提交
280 281 282 283 284 285 286 287
  if (NULL == pParamList) {
    if (ctx->pBlockList) {
      SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0);
      *rowNum = pBlock->info.rows;
    } else {
      *rowNum = 1;
    }

D
dapan 已提交
288
    *paramNum = 1;
D
dapan1121 已提交
289
  } else {
D
dapan 已提交
290
    *paramNum = pParamList->length;
D
dapan1121 已提交
291 292
  }

D
dapan 已提交
293
  SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
294
  if (NULL == paramList) {
D
dapan 已提交
295
    sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
D
dapan1121 已提交
296
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
297 298
  }

D
dapan1121 已提交
299 300 301 302 303 304
  if (pParamList) {
    SNode *tnode = NULL;
    int32_t i = 0;
    if (SCL_IS_CONST_CALC(ctx)) {
      WHERE_EACH (tnode, pParamList) { 
        if (!SCL_IS_CONST_NODE(tnode)) {
D
dapan 已提交
305
          WHERE_NEXT;
D
dapan1121 已提交
306 307 308 309 310 311 312 313 314 315 316 317
        } else {
          SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
          ERASE_NODE(pParamList);
        }
        
        ++i;
      }
    } else {
      FOREACH(tnode, pParamList) { 
        SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
        ++i;
      }
D
dapan1121 已提交
318
    }
D
dapan1121 已提交
319 320 321
  } else {
    paramList[0].numOfRows = *rowNum;
  }
D
dapan1121 已提交
322

D
dapan1121 已提交
323 324
  if (0 == *rowNum) {
    taosMemoryFreeClear(paramList);    
D
dapan1121 已提交
325
  }
D
dapan1121 已提交
326

D
dapan1121 已提交
327
  *pParams = paramList;
D
dapan1121 已提交
328
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
329

D
dapan1121 已提交
330
_return:
wafwerar's avatar
wafwerar 已提交
331
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
332 333 334 335 336
  SCL_RET(code);
}

int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
337
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
338 339 340 341 342
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
wafwerar's avatar
wafwerar 已提交
343
  SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
344 345
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
346 347 348
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
349
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
350
  if (paramNum > 1) {
D
dapan1121 已提交
351
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
352 353
  }

D
dapan1121 已提交
354
  *pParams = paramList;
D
dapan1121 已提交
355
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
356 357

_return:
wafwerar's avatar
wafwerar 已提交
358
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
359
  SCL_RET(code);
D
dapan1121 已提交
360 361
}

362
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
363 364
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
365
  int32_t paramNum = 0;
D
dapan1121 已提交
366
  int32_t code = 0;
D
dapan 已提交
367
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
368

D
dapan1121 已提交
369
  if (fmIsUserDefinedFunc(node->funcId)) {
370
    code = callUdfScalarFunc(node->functionName, params, paramNum, output);
371 372 373 374
    if (code != 0) {
      sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
      goto _return;
    }
D
dapan1121 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
  } else {
    SScalarFuncExecFuncs ffpSet = {0};
    code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
    if (code) {
      sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
  
    output->columnData = createColumnInfoData(&node->node.resType, rowNum);
    if (output->columnData == NULL) {
      sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    code = (*ffpSet.process)(params, paramNum, output);
    if (code) {
      sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
394 395 396 397
  }

_return:

D
dapan 已提交
398
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
399
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
400 401
  }

wafwerar's avatar
wafwerar 已提交
402
  taosMemoryFreeClear(params);
D
dapan1121 已提交
403 404 405 406 407 408 409 410 411 412 413 414
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
415 416
  }

D
dapan1121 已提交
417 418 419 420 421 422 423
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
424
  int32_t paramNum = 0;
D
dapan1121 已提交
425
  int32_t code = 0;
D
dapan 已提交
426
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
427 428 429 430
  if (NULL == params) {
    output->numOfRows = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
431

432 433 434 435 436 437
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
  output->columnData = createColumnInfoData(&t, rowNum);
  if (output->columnData == NULL) {
D
dapan1121 已提交
438
    sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool)));
D
dapan1121 已提交
439 440
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
441

D
dapan1121 已提交
442
  bool value = false;
D
dapan 已提交
443
  bool complete = true;
D
dapan1121 已提交
444
  for (int32_t i = 0; i < rowNum; ++i) {
D
dapan 已提交
445 446
    complete = true;
    for (int32_t m = 0; m < paramNum; ++m) {
D
dapan1121 已提交
447
      if (NULL == params[m].columnData) {
D
dapan 已提交
448
        complete = false;
D
dapan1121 已提交
449 450
        continue;
      }
451 452 453
      char* p = colDataGetData(params[m].columnData, i);
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
454
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
D
dapan1121 已提交
455
        complete = true;
D
dapan1121 已提交
456 457
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
D
dapan1121 已提交
458
        complete = true;
D
dapan1121 已提交
459 460 461 462 463 464
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

D
dapan 已提交
465 466 467
    if (complete) {
      colDataAppend(output->columnData, i, (char*) &value, false);
    }
D
dapan1121 已提交
468 469
  }

D
dapan1121 已提交
470 471 472 473 474
  if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
    sclFreeParam(output);
    output->numOfRows = 0;
  }

D
dapan1121 已提交
475
_return:
D
dapan1121 已提交
476

D
dapan 已提交
477
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
478
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
479 480
  }

wafwerar's avatar
wafwerar 已提交
481
  taosMemoryFreeClear(params);
D
dapan1121 已提交
482
  SCL_RET(code);
D
dapan1121 已提交
483 484 485 486 487 488
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
489

D
dapan1121 已提交
490
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
491 492 493
  output->columnData = createColumnInfoData(&node->node.resType, rowNum);
  if (output->columnData == NULL) {
    sclError("calloc failed, size:%d", (int32_t)rowNum * node->node.resType.bytes);
D
dapan1121 已提交
494 495 496 497 498
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

D
dapan1121 已提交
499
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
500 501
  SScalarParam* pLeft = &params[0];
  SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
502

D
dapan 已提交
503
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
D
dapan1121 已提交
504 505

_return:
D
dapan1121 已提交
506
  for (int32_t i = 0; i < paramNum; ++i) {
507
//    sclFreeParam(&params[i]);
D
dapan1121 已提交
508 509
  }

wafwerar's avatar
wafwerar 已提交
510
  taosMemoryFreeClear(params);
D
dapan1121 已提交
511
  SCL_RET(code);
D
dapan1121 已提交
512 513
}

D
dapan1121 已提交
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) {
  if (opType <= OP_TYPE_CALC_MAX) {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  } else {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_BOOL;
D
dapan1121 已提交
536
    res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
D
dapan1121 已提交
537 538 539 540 541 542 543 544 545 546
    res->datum.b = false;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  }

  return DEAL_RES_CONTINUE;
}


D
dapan 已提交
547
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
548 549 550 551
  SOperatorNode *node = (SOperatorNode *)*pNode;

  if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
    SValueNode *valueNode = (SValueNode *)node->pLeft;
D
dapan1121 已提交
552
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
D
dapan1121 已提交
553 554
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
D
dapan 已提交
555 556 557 558 559

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) 
      && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode);
    }
D
dapan1121 已提交
560 561 562 563
  }

  if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
    SValueNode *valueNode = (SValueNode *)node->pRight;
D
dapan1121 已提交
564
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
D
dapan1121 已提交
565 566
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
D
dapan 已提交
567 568 569 570 571

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) 
      && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode);
    }
D
dapan1121 已提交
572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
  }

  if (node->pRight && (QUERY_NODE_NODE_LIST == nodeType(node->pRight))) {
    SNodeListNode *listNode = (SNodeListNode *)node->pRight;
    SNode* tnode = NULL;
    WHERE_EACH(tnode, listNode->pNodeList) {
      if (SCL_IS_NULL_VALUE_NODE(tnode)) {
        if (node->opType == OP_TYPE_IN) {
          ERASE_NODE(listNode->pNodeList);
          continue;
        } else { //OP_TYPE_NOT_IN
          return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
        }
      }

      WHERE_NEXT;
    }

    if (listNode->pNodeList->length <= 0) {
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
598
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
599
  SFunctionNode *node = (SFunctionNode *)*pNode;
D
dapan1121 已提交
600
  SNode* tnode = NULL;
601
  if (!fmIsScalarFunc(node->funcId)) {
D
dapan1121 已提交
602 603
    return DEAL_RES_CONTINUE;
  }
604

D
dapan1121 已提交
605 606 607 608 609 610
  FOREACH(tnode, node->pParameterList) {
    if (!SCL_IS_CONST_NODE(tnode)) {
      return DEAL_RES_CONTINUE;
    }
  }

D
dapan1121 已提交
611
  SScalarParam output = {0};
612

613
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan 已提交
614
  if (ctx->code) {
D
dapan1121 已提交
615 616 617
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
618
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
619 620
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
621
    sclFreeParam(&output);
D
dapan 已提交
622
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
623 624 625
    return DEAL_RES_ERROR;
  }

626 627
  res->translate = true;

628 629
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
630
  } else {
D
dapan1121 已提交
631 632 633 634
    res->node.resType.type = output.columnData->info.type;
    res->node.resType.bytes = output.columnData->info.bytes;
    res->node.resType.scale = output.columnData->info.scale;
    res->node.resType.precision = output.columnData->info.precision;
635 636
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {
637
      res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
638
      memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
639
    } else {
D
dapan1121 已提交
640
      nodesSetValueNodeValue(res, output.columnData->pData);
641
    }
D
dapan1121 已提交
642
  }
643

D
dapan1121 已提交
644 645 646
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
647
  sclFreeParam(&output);
D
dapan1121 已提交
648 649 650
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
651
EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
652 653
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

H
Haojun Liao 已提交
654
  SScalarParam output = {0};
D
dapan 已提交
655 656
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
657 658 659
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
660 661 662 663
  if (0 == output.numOfRows) {
    return DEAL_RES_CONTINUE;
  }

D
dapan1121 已提交
664
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
665 666
  if (NULL == res) {
    sclError("make value node failed");
667
    sclFreeParam(&output);
D
dapan 已提交
668
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
669 670 671 672
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;
673
  res->translate = true;
D
dapan1121 已提交
674

675 676 677 678
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
679
  } else {
D
dapan1121 已提交
680
    nodesSetValueNodeValue(res, output.columnData->pData);
D
dapan1121 已提交
681
  }
D
dapan1121 已提交
682 683 684 685

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
686
  sclFreeParam(&output);
D
dapan1121 已提交
687 688 689
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
690
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
691
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
692

D
dapan1121 已提交
693
  if ((!SCL_IS_CONST_NODE(node->pLeft)) || (!SCL_IS_CONST_NODE(node->pRight))) {
D
dapan 已提交
694
    return sclRewriteNonConstOperator(pNode, ctx);
D
dapan1121 已提交
695 696
  }

H
Haojun Liao 已提交
697
  SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
D
dapan 已提交
698 699
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
700 701 702
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
703
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
704
  if (NULL == res) {
705 706
    sclError("make value node failed");
    sclFreeParam(&output);
D
dapan 已提交
707
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
708 709 710
    return DEAL_RES_ERROR;
  }

711
  res->translate = true;
D
dapan1121 已提交
712

713 714
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
715
  } else {
716 717 718 719 720 721
    res->node.resType = node->node.resType;
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
      res->datum.p = output.columnData->pData;
      output.columnData->pData = NULL;
    } else {
D
dapan1121 已提交
722
      nodesSetValueNodeValue(res, output.columnData->pData);    
723
    }
D
dapan1121 已提交
724
  }
D
dapan1121 已提交
725 726 727 728

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

H
Haojun Liao 已提交
729
  sclFreeParam(&output);
D
dapan1121 已提交
730 731 732 733
  return DEAL_RES_CONTINUE;
}

EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
D
dapan 已提交
734 735
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
736
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
737
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
738 739 740
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
741
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
742 743
  }

D
dapan1121 已提交
744
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
D
dapan 已提交
745
    return sclRewriteOperator(pNode, ctx);
746
  }
747 748

  return DEAL_RES_CONTINUE;
D
dapan1121 已提交
749 750
}

D
dapan 已提交
751
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
752
  SFunctionNode *node = (SFunctionNode *)pNode;
D
dapan1121 已提交
753
  SScalarParam output = {0};
754

755
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan1121 已提交
756 757
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
758 759
  }

D
dapan1121 已提交
760
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
761 762
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
763 764
  }

D
dapan1121 已提交
765 766 767
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
768
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
769
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
D
dapan1121 已提交
770
  SScalarParam output = {0};
771

D
dapan1121 已提交
772 773 774
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
775 776
  }

D
dapan1121 已提交
777
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
778
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
779
    return DEAL_RES_ERROR;
D
dapan1121 已提交
780 781 782 783 784
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
785
EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
786
  SOperatorNode *node = (SOperatorNode *)pNode;
D
dapan1121 已提交
787
  SScalarParam output = {0};
D
dapan1121 已提交
788
  
D
dapan1121 已提交
789 790
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
791 792
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
793

D
dapan1121 已提交
794
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
795
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
796 797 798
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
799 800 801
  return DEAL_RES_CONTINUE;
}

802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
  STargetNode *target = (STargetNode *)pNode;

  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  int32_t index = -1;
  for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
    SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
    if (pb->info.blockId == target->dataBlockId) {
      index = i;
      break;
    }
  }

  if (index == -1) {
    sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);

  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  // if block->pDataBlock is not enough, there are problems if target->slotId bigger than the size of block->pDataBlock,
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);

  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }

  colDataAssign(col, res->columnData, res->numOfRows);
  block->info.rows = res->numOfRows;

  sclFreeParam(res);
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  return DEAL_RES_CONTINUE;
}
D
dapan 已提交
851

D
dapan1121 已提交
852
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
D
dapan1121 已提交
853
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)|| QUERY_NODE_LEFT_VALUE == nodeType(pNode)) {
D
dapan1121 已提交
854
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
855
  }
D
dapan 已提交
856 857

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
858
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
859
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
860
  }
D
dapan1121 已提交
861

D
dapan1121 已提交
862
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
863
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
864
  }
D
dapan1121 已提交
865

D
dapan1121 已提交
866
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
D
dapan 已提交
867
    return sclWalkOperator(pNode, ctx);
D
dapan1121 已提交
868
  }
D
dapan1121 已提交
869

870 871 872
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
873

D
dapan 已提交
874
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
875 876
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
877 878 879 880 881 882 883 884
}

int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
885
  SScalarCtx ctx = {0};
886
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan 已提交
887
  if (NULL == ctx.pRes) {
888
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan 已提交
889 890
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
891
  
X
Xiaoyu Wang 已提交
892
  nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
D
dapan 已提交
893
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
894 895
  *pRes = pNode;

D
dapan 已提交
896 897 898
_return:
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
899 900
}

D
dapan 已提交
901
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
D
dapan1121 已提交
902
  if (NULL == pNode || NULL == pBlockList) {
D
dapan1121 已提交
903 904 905 906
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
wmmhello's avatar
wmmhello 已提交
907
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst ? pDst->param : NULL};
908

909
  // TODO: OPT performance
910
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan1121 已提交
911
  if (NULL == ctx.pRes) {
912
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan1121 已提交
913 914
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
915
  
X
Xiaoyu Wang 已提交
916
  nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
D
dapan 已提交
917
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
918

D
dapan1121 已提交
919 920 921 922 923 924 925
  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
    
926 927
    colDataAssign(pDst->columnData, res->columnData, res->numOfRows);
    pDst->numOfRows = res->numOfRows;
D
dapan1121 已提交
928
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
D
dapan1121 已提交
929
  }
D
dapan1121 已提交
930

D
dapan 已提交
931
_return:
D
dapan1121 已提交
932
  //nodesDestroyNode(pNode);
D
dapan 已提交
933 934
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
935
}