scalar.c 27.9 KB
Newer Older
D
dapan1121 已提交
1 2
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
3 4
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
5
#include "sclInt.h"
H
Haojun Liao 已提交
6 7 8
#include "sclvector.h"
#include "tcommon.h"
#include "tdatablock.h"
9
#include "scalar.h"
S
slzhou 已提交
10
#include "tudf.h"
D
fix bug  
dapan1121 已提交
11
#include "ttime.h"
D
dapan1121 已提交
12

D
dapan1121 已提交
13
int32_t scalarGetOperatorParamNum(EOperatorType type) {
D
dapan1121 已提交
14
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type 
D
dapan1121 已提交
15 16
   || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type
   || OP_TYPE_MINUS == type) {
D
dapan1121 已提交
17 18 19 20 21 22
    return 1;
  }

  return 2;
}

D
dapan 已提交
23 24 25 26 27 28 29
void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
  char *timeStr = valueNode->datum.p;
  if (convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i) !=
      TSDB_CODE_SUCCESS) {
    valueNode->datum.i = 0;
  }
  taosMemoryFree(timeStr);
D
dapan1121 已提交
30
  valueNode->typeData = valueNode->datum.i;
D
dapan 已提交
31 32 33 34 35 36
  
  valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
  valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
}


37
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
H
Haojun Liao 已提交
38
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
39 40 41 42 43 44 45 46 47 48
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pColumnData->info.type      = pType->type;
  pColumnData->info.bytes     = pType->bytes;
  pColumnData->info.scale     = pType->scale;
  pColumnData->info.precision = pType->precision;

49
  int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows);
50 51
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
52
    taosMemoryFree(pColumnData);
53 54 55 56 57 58
    return NULL;
  } else {
    return pColumnData;
  }
}

59 60 61 62
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
  SScalarParam in = {.numOfRows = 1};
  in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
  colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
63

64
  colInfoDataEnsureCapacity(out->columnData, 0, 1);
65 66 67 68
  int32_t code = vectorConvertImpl(&in, out);
  sclFreeParam(&in);

  return code;
69 70
}

D
dapan1121 已提交
71 72 73 74 75 76 77 78 79 80 81 82
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); 

  int32_t code = 0;
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
  SListCell *cell = nodeList->pNodeList->pHead;
H
Haojun Liao 已提交
83
  SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
84

D
dapan1121 已提交
85 86 87 88 89
  int32_t len = 0;
  void *buf = NULL;
  
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;
D
dapan1121 已提交
90
    
D
dapan1121 已提交
91
    if (valueNode->node.resType.type != type) {
92
      out.columnData->info.type = type;
D
dapan1121 已提交
93 94 95 96 97 98 99 100 101
      if (IS_VAR_DATA_TYPE(type)) {
        if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
          out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE;
        } else {
          out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE;
        }
      } else {
        out.columnData->info.bytes = tDataTypes[type].bytes;
      }
102

103 104
      code = doConvertDataType(valueNode, &out);
      if (code != TSDB_CODE_SUCCESS) {
105
//        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
106 107 108 109
        SCL_ERR_JRET(code);
      }

      if (IS_VAR_DATA_TYPE(type)) {
D
dapan1121 已提交
110 111 112
        char* data = colDataGetVarData(out.columnData, 0);
        len = varDataLen(data);
        buf = varDataVal(data);
D
dapan1121 已提交
113 114
      } else {
        len = tDataTypes[type].bytes;
115
        buf = out.columnData->pData;
D
dapan1121 已提交
116 117 118
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
119 120 121 122 123
      if (IS_VAR_DATA_TYPE(type)) {
        len = varDataLen(buf);
        buf = varDataVal(buf);
      } else {
        len = valueNode->node.resType.bytes;
124
      }
D
dapan1121 已提交
125 126
    }
    
127
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
128
      sclError("taosHashPut to set failed");
D
dapan1121 已提交
129 130
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
131 132

    cell = cell->pNext;
D
dapan1121 已提交
133 134 135 136 137 138 139 140 141 142
  }

  *data = pObj;
  return TSDB_CODE_SUCCESS;

_return:
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
143 144 145 146 147 148 149
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
  void *pIter = taosHashIterate(res, NULL);
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
150
      sclFreeParam(p);
D
dapan1121 已提交
151 152 153 154 155 156
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

D
dapan1121 已提交
157
void sclFreeParam(SScalarParam *param) {
158 159
  if (param->columnData != NULL) {
    colDataDestroy(param->columnData);
wmmhello's avatar
wmmhello 已提交
160
    taosMemoryFree(param->columnData);
161 162 163 164 165
  }

  if (param->pHashFilter != NULL) {
    taosHashCleanup(param->pHashFilter);
  }
D
dapan1121 已提交
166 167
}

D
dapan1121 已提交
168 169 170 171 172
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
  
wafwerar's avatar
wafwerar 已提交
173
  *res = taosMemoryMalloc(pNode->node.resType.bytes);
D
dapan1121 已提交
174 175 176 177 178 179 180 181 182
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
183 184 185 186
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
  switch (nodeType(node)) {
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
187 188 189 190

      param->numOfRows = 1;
      param->columnData = createColumnInfoData(&valueNode->node.resType, 1);
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type) {
191
        colDataAppendNULL(param->columnData, 0);
192 193
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
194
      }
D
dapan1121 已提交
195 196
      break;
    }
D
dapan1121 已提交
197 198
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
199 200
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
201 202 203
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan1121 已提交
204
      SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, nodeList->dataType.type));
D
dapan 已提交
205
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
206
        taosHashCleanup(param->pHashFilter);
D
dapan 已提交
207 208 209
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }   
D
dapan1121 已提交
210 211
      break;
    }
X
Xiaoyu Wang 已提交
212
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
213 214
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
215 216 217
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      
X
Xiaoyu Wang 已提交
218
      SColumnNode *ref = (SColumnNode *)node;
219 220 221 222 223 224 225 226 227 228 229

      int32_t index = -1;
      for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
        SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
        if (pb->info.blockId == ref->dataBlockId) {
          index = i;
          break;
        }
      }

      if (index == -1) {
D
dapan1121 已提交
230
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
231 232 233
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

234 235
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
      if (NULL == block || ref->slotId >= block->info.numOfCols) {
D
dapan 已提交
236
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
237 238 239
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
240
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
241 242
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
243 244
      break;
    }
245 246 247
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
D
dapan1121 已提交
248 249 250 251 252 253 254 255
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
      break;
    }
256 257
    default:
      break;
D
dapan1121 已提交
258 259
  }

260 261 262
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
263 264 265
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
    
266
    *rowNum = param->numOfRows;
D
dapan1121 已提交
267 268
  }

269
  param->param = ctx->param;
D
dapan1121 已提交
270 271 272
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
273
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) {  
D
dapan1121 已提交
274
  int32_t code = 0;
D
dapan1121 已提交
275 276 277 278 279 280 281 282
  if (NULL == pParamList) {
    if (ctx->pBlockList) {
      SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0);
      *rowNum = pBlock->info.rows;
    } else {
      *rowNum = 1;
    }

D
dapan 已提交
283
    *paramNum = 1;
D
dapan1121 已提交
284
  } else {
D
dapan 已提交
285
    *paramNum = pParamList->length;
D
dapan1121 已提交
286 287
  }

D
dapan 已提交
288
  SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
289
  if (NULL == paramList) {
D
dapan 已提交
290
    sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
D
dapan1121 已提交
291
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
292 293
  }

D
dapan1121 已提交
294 295 296 297 298 299
  if (pParamList) {
    SNode *tnode = NULL;
    int32_t i = 0;
    if (SCL_IS_CONST_CALC(ctx)) {
      WHERE_EACH (tnode, pParamList) { 
        if (!SCL_IS_CONST_NODE(tnode)) {
D
dapan 已提交
300
          WHERE_NEXT;
D
dapan1121 已提交
301 302 303 304 305 306 307 308 309 310 311 312
        } else {
          SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
          ERASE_NODE(pParamList);
        }
        
        ++i;
      }
    } else {
      FOREACH(tnode, pParamList) { 
        SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
        ++i;
      }
D
dapan1121 已提交
313
    }
D
dapan1121 已提交
314 315 316
  } else {
    paramList[0].numOfRows = *rowNum;
  }
D
dapan1121 已提交
317

D
dapan1121 已提交
318 319
  if (0 == *rowNum) {
    taosMemoryFreeClear(paramList);    
D
dapan1121 已提交
320
  }
D
dapan1121 已提交
321

D
dapan1121 已提交
322
  *pParams = paramList;
D
dapan1121 已提交
323
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
324

D
dapan1121 已提交
325
_return:
wafwerar's avatar
wafwerar 已提交
326
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
327 328 329 330 331
  SCL_RET(code);
}

int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
332
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
333 334 335 336 337
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
wafwerar's avatar
wafwerar 已提交
338
  SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
339 340
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
341 342 343
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
344
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
345
  if (paramNum > 1) {
D
dapan1121 已提交
346
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
347 348
  }

D
dapan1121 已提交
349
  *pParams = paramList;
D
dapan1121 已提交
350
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
351 352

_return:
wafwerar's avatar
wafwerar 已提交
353
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
354
  SCL_RET(code);
D
dapan1121 已提交
355 356
}

357
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
358 359
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
360
  int32_t paramNum = 0;
D
dapan1121 已提交
361
  int32_t code = 0;
D
dapan 已提交
362
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
363

D
dapan1121 已提交
364
  if (fmIsUserDefinedFunc(node->funcId)) {
365
    code = callUdfScalarFunc(node->functionName, params, paramNum, output);
366 367 368 369
    if (code != 0) {
      sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
      goto _return;
    }
D
dapan1121 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
  } else {
    SScalarFuncExecFuncs ffpSet = {0};
    code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
    if (code) {
      sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
  
    output->columnData = createColumnInfoData(&node->node.resType, rowNum);
    if (output->columnData == NULL) {
      sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    code = (*ffpSet.process)(params, paramNum, output);
    if (code) {
      sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
389 390 391 392
  }

_return:

D
dapan 已提交
393
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
394
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
395 396
  }

wafwerar's avatar
wafwerar 已提交
397
  taosMemoryFreeClear(params);
D
dapan1121 已提交
398 399 400 401 402 403 404 405 406 407 408 409
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
410 411
  }

D
dapan1121 已提交
412 413 414 415 416 417 418
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
419
  int32_t paramNum = 0;
D
dapan1121 已提交
420
  int32_t code = 0;
D
dapan 已提交
421
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
422 423 424 425
  if (NULL == params) {
    output->numOfRows = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
426

427 428 429 430 431 432
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
  output->columnData = createColumnInfoData(&t, rowNum);
  if (output->columnData == NULL) {
D
dapan1121 已提交
433
    sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool)));
D
dapan1121 已提交
434 435
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
436

D
dapan1121 已提交
437
  bool value = false;
D
dapan 已提交
438
  bool complete = true;
D
dapan1121 已提交
439
  for (int32_t i = 0; i < rowNum; ++i) {
D
dapan 已提交
440 441
    complete = true;
    for (int32_t m = 0; m < paramNum; ++m) {
D
dapan1121 已提交
442
      if (NULL == params[m].columnData) {
D
dapan 已提交
443
        complete = false;
D
dapan1121 已提交
444 445
        continue;
      }
446 447 448
      char* p = colDataGetData(params[m].columnData, i);
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
449
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
D
dapan1121 已提交
450
        complete = true;
D
dapan1121 已提交
451 452
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
D
dapan1121 已提交
453
        complete = true;
D
dapan1121 已提交
454 455 456 457 458 459
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

D
dapan 已提交
460 461 462
    if (complete) {
      colDataAppend(output->columnData, i, (char*) &value, false);
    }
D
dapan1121 已提交
463 464
  }

D
dapan1121 已提交
465 466 467 468 469
  if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
    sclFreeParam(output);
    output->numOfRows = 0;
  }

D
dapan1121 已提交
470
_return:
D
dapan1121 已提交
471

D
dapan 已提交
472
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
473
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
474 475
  }

wafwerar's avatar
wafwerar 已提交
476
  taosMemoryFreeClear(params);
D
dapan1121 已提交
477
  SCL_RET(code);
D
dapan1121 已提交
478 479 480 481 482 483
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
484

D
dapan1121 已提交
485
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
486 487 488
  output->columnData = createColumnInfoData(&node->node.resType, rowNum);
  if (output->columnData == NULL) {
    sclError("calloc failed, size:%d", (int32_t)rowNum * node->node.resType.bytes);
D
dapan1121 已提交
489 490 491 492 493
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

D
dapan1121 已提交
494
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
495 496
  SScalarParam* pLeft = &params[0];
  SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
497

D
dapan 已提交
498
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
D
dapan1121 已提交
499 500

_return:
D
dapan1121 已提交
501
  for (int32_t i = 0; i < paramNum; ++i) {
502
//    sclFreeParam(&params[i]);
D
dapan1121 已提交
503 504
  }

wafwerar's avatar
wafwerar 已提交
505
  taosMemoryFreeClear(params);
D
dapan1121 已提交
506
  SCL_RET(code);
D
dapan1121 已提交
507 508
}

D
dapan1121 已提交
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) {
  if (opType <= OP_TYPE_CALC_MAX) {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  } else {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_BOOL;
D
dapan1121 已提交
531
    res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
D
dapan1121 已提交
532 533 534 535 536 537 538 539 540 541
    res->datum.b = false;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  }

  return DEAL_RES_CONTINUE;
}


D
dapan 已提交
542
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
543 544 545 546
  SOperatorNode *node = (SOperatorNode *)*pNode;

  if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
    SValueNode *valueNode = (SValueNode *)node->pLeft;
D
dapan1121 已提交
547
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
D
dapan1121 已提交
548 549
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
D
dapan 已提交
550 551 552 553 554

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) 
      && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode);
    }
D
dapan1121 已提交
555 556 557 558
  }

  if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
    SValueNode *valueNode = (SValueNode *)node->pRight;
D
dapan1121 已提交
559
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
D
dapan1121 已提交
560 561
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
D
dapan 已提交
562 563 564 565 566

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) 
      && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
      sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode);
    }
D
dapan1121 已提交
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
  }

  if (node->pRight && (QUERY_NODE_NODE_LIST == nodeType(node->pRight))) {
    SNodeListNode *listNode = (SNodeListNode *)node->pRight;
    SNode* tnode = NULL;
    WHERE_EACH(tnode, listNode->pNodeList) {
      if (SCL_IS_NULL_VALUE_NODE(tnode)) {
        if (node->opType == OP_TYPE_IN) {
          ERASE_NODE(listNode->pNodeList);
          continue;
        } else { //OP_TYPE_NOT_IN
          return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
        }
      }

      WHERE_NEXT;
    }

    if (listNode->pNodeList->length <= 0) {
      return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
    }
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
593
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
594
  SFunctionNode *node = (SFunctionNode *)*pNode;
D
dapan1121 已提交
595
  SNode* tnode = NULL;
596
  if (!fmIsScalarFunc(node->funcId)) {
D
dapan1121 已提交
597 598
    return DEAL_RES_CONTINUE;
  }
599

D
dapan1121 已提交
600 601 602 603 604 605
  FOREACH(tnode, node->pParameterList) {
    if (!SCL_IS_CONST_NODE(tnode)) {
      return DEAL_RES_CONTINUE;
    }
  }

D
dapan1121 已提交
606
  SScalarParam output = {0};
607

608
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan 已提交
609
  if (ctx->code) {
D
dapan1121 已提交
610 611 612
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
613
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
614 615
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
616
    sclFreeParam(&output);
D
dapan 已提交
617
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
618 619 620
    return DEAL_RES_ERROR;
  }

621 622
  res->translate = true;

623 624
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
625
  } else {
D
dapan1121 已提交
626 627 628 629
    res->node.resType.type = output.columnData->info.type;
    res->node.resType.bytes = output.columnData->info.bytes;
    res->node.resType.scale = output.columnData->info.scale;
    res->node.resType.precision = output.columnData->info.precision;
630 631
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {
632
      res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
633
      memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
634
    } else {
D
dapan1121 已提交
635
      nodesSetValueNodeValue(res, output.columnData->pData);
636
    }
D
dapan1121 已提交
637
  }
638

D
dapan1121 已提交
639 640 641
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
642
  sclFreeParam(&output);
D
dapan1121 已提交
643 644 645
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
646
EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
647 648
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

H
Haojun Liao 已提交
649
  SScalarParam output = {0};
D
dapan 已提交
650 651
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
652 653 654
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
655 656 657 658
  if (0 == output.numOfRows) {
    return DEAL_RES_CONTINUE;
  }

D
dapan1121 已提交
659
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
660 661
  if (NULL == res) {
    sclError("make value node failed");
662
    sclFreeParam(&output);
D
dapan 已提交
663
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
664 665 666 667
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;
668
  res->translate = true;
D
dapan1121 已提交
669

670 671 672 673
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
674
  } else {
D
dapan1121 已提交
675
    nodesSetValueNodeValue(res, output.columnData->pData);
D
dapan1121 已提交
676
  }
D
dapan1121 已提交
677 678 679 680

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
681
  sclFreeParam(&output);
D
dapan1121 已提交
682 683 684
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
685
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
686
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
687

D
dapan1121 已提交
688
  if ((!SCL_IS_CONST_NODE(node->pLeft)) || (!SCL_IS_CONST_NODE(node->pRight))) {
D
dapan 已提交
689
    return sclRewriteNonConstOperator(pNode, ctx);
D
dapan1121 已提交
690 691
  }

H
Haojun Liao 已提交
692
  SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
D
dapan 已提交
693 694
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
695 696 697
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
698
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
699
  if (NULL == res) {
700 701
    sclError("make value node failed");
    sclFreeParam(&output);
D
dapan 已提交
702
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
703 704 705
    return DEAL_RES_ERROR;
  }

706
  res->translate = true;
D
dapan1121 已提交
707

708 709
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
710
  } else {
711 712 713 714 715 716
    res->node.resType = node->node.resType;
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
      res->datum.p = output.columnData->pData;
      output.columnData->pData = NULL;
    } else {
D
dapan1121 已提交
717
      nodesSetValueNodeValue(res, output.columnData->pData);    
718
    }
D
dapan1121 已提交
719
  }
D
dapan1121 已提交
720 721 722 723

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

H
Haojun Liao 已提交
724
  sclFreeParam(&output);
D
dapan1121 已提交
725 726 727 728
  return DEAL_RES_CONTINUE;
}

EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
D
dapan 已提交
729 730
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
731
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
732
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
733 734 735
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
736
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
737 738
  }

D
dapan1121 已提交
739
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
D
dapan 已提交
740
    return sclRewriteOperator(pNode, ctx);
741
  }
742 743

  return DEAL_RES_CONTINUE;
D
dapan1121 已提交
744 745
}

D
dapan 已提交
746
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
747
  SFunctionNode *node = (SFunctionNode *)pNode;
D
dapan1121 已提交
748
  SScalarParam output = {0};
749

750
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan1121 已提交
751 752
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
753 754
  }

D
dapan1121 已提交
755
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
756 757
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
758 759
  }

D
dapan1121 已提交
760 761 762
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
763
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
764
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
D
dapan1121 已提交
765
  SScalarParam output = {0};
766

D
dapan1121 已提交
767 768 769
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
770 771
  }

D
dapan1121 已提交
772
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
773
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
774
    return DEAL_RES_ERROR;
D
dapan1121 已提交
775 776 777 778 779
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
780
EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
781
  SOperatorNode *node = (SOperatorNode *)pNode;
D
dapan1121 已提交
782
  SScalarParam output = {0};
D
dapan1121 已提交
783
  
D
dapan1121 已提交
784 785
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
786 787
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
788

D
dapan1121 已提交
789
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
790
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
791 792 793
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
794 795 796
  return DEAL_RES_CONTINUE;
}

797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
  STargetNode *target = (STargetNode *)pNode;

  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  int32_t index = -1;
  for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
    SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
    if (pb->info.blockId == target->dataBlockId) {
      index = i;
      break;
    }
  }

  if (index == -1) {
    sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);

  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  // if block->pDataBlock is not enough, there are problems if target->slotId bigger than the size of block->pDataBlock,
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);

  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }

  colDataAssign(col, res->columnData, res->numOfRows);
  block->info.rows = res->numOfRows;

  sclFreeParam(res);
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  return DEAL_RES_CONTINUE;
}
D
dapan 已提交
846

D
dapan1121 已提交
847
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
X
Xiaoyu Wang 已提交
848
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) {
D
dapan1121 已提交
849
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
850
  }
D
dapan 已提交
851 852

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
853
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
854
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
855
  }
D
dapan1121 已提交
856

D
dapan1121 已提交
857
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
858
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
859
  }
D
dapan1121 已提交
860

D
dapan1121 已提交
861
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
D
dapan 已提交
862
    return sclWalkOperator(pNode, ctx);
D
dapan1121 已提交
863
  }
D
dapan1121 已提交
864

865 866 867
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
868

D
dapan 已提交
869
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
870 871
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
872 873 874 875 876 877 878 879
}

int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
880
  SScalarCtx ctx = {0};
881
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan 已提交
882
  if (NULL == ctx.pRes) {
883
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan 已提交
884 885
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
886
  
X
Xiaoyu Wang 已提交
887
  nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
D
dapan 已提交
888
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
889 890
  *pRes = pNode;

D
dapan 已提交
891 892 893
_return:
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
894 895
}

D
dapan 已提交
896
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
D
dapan1121 已提交
897
  if (NULL == pNode || NULL == pBlockList) {
D
dapan1121 已提交
898 899 900 901
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
902
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param};
903

904
  // TODO: OPT performance
905
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan1121 已提交
906
  if (NULL == ctx.pRes) {
907
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan1121 已提交
908 909
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
910
  
X
Xiaoyu Wang 已提交
911
  nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
D
dapan 已提交
912
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
913

D
dapan1121 已提交
914 915 916 917 918 919 920
  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
    
921 922
    colDataAssign(pDst->columnData, res->columnData, res->numOfRows);
    pDst->numOfRows = res->numOfRows;
D
dapan1121 已提交
923
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
D
dapan1121 已提交
924
  }
D
dapan1121 已提交
925

D
dapan 已提交
926
_return:
D
dapan1121 已提交
927
  //nodesDestroyNode(pNode);
D
dapan 已提交
928 929
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
930
}