scalar.c 34.3 KB
Newer Older
D
dapan1121 已提交
1 2
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
3 4
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
5
#include "sclInt.h"
H
Haojun Liao 已提交
6 7 8
#include "sclvector.h"
#include "tcommon.h"
#include "tdatablock.h"
9
#include "scalar.h"
S
slzhou 已提交
10
#include "tudf.h"
D
fix bug  
dapan1121 已提交
11
#include "ttime.h"
D
dapan1121 已提交
12

D
dapan1121 已提交
13
int32_t scalarGetOperatorParamNum(EOperatorType type) {
D
dapan1121 已提交
14
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type 
D
dapan1121 已提交
15 16
   || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type
   || OP_TYPE_MINUS == type) {
D
dapan1121 已提交
17 18 19 20 21 22
    return 1;
  }

  return 2;
}

D
dapan1121 已提交
23
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
D
dapan 已提交
24
  char *timeStr = valueNode->datum.p;
D
dapan1121 已提交
25 26 27
  int32_t code = convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
D
dapan 已提交
28 29
  }
  taosMemoryFree(timeStr);
D
dapan1121 已提交
30
  valueNode->typeData = valueNode->datum.i;
D
dapan 已提交
31 32 33
  
  valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
  valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
D
dapan1121 已提交
34 35

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
36 37
}

H
Haojun Liao 已提交
38
int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
H
Haojun Liao 已提交
39
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
40 41
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
42
    return terrno;
43 44 45 46 47 48 49
  }

  pColumnData->info.type      = pType->type;
  pColumnData->info.bytes     = pType->bytes;
  pColumnData->info.scale     = pType->scale;
  pColumnData->info.precision = pType->precision;

50
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
51 52
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
53
    taosMemoryFree(pColumnData);
H
Haojun Liao 已提交
54
    return terrno;
55
  }
H
Haojun Liao 已提交
56 57

  pParam->columnData = pColumnData;
58
  pParam->type = SHOULD_FREE_COLDATA;
H
Haojun Liao 已提交
59
  return TSDB_CODE_SUCCESS;
60 61
}

62 63
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
  SScalarParam in = {.numOfRows = 1};
H
Haojun Liao 已提交
64 65 66 67 68
  int32_t code = sclCreateColumnInfoData(&pValueNode->node.resType, 1, &in);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

69
  colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
70

71
  colInfoDataEnsureCapacity(out->columnData, 1);
H
Haojun Liao 已提交
72
  code = vectorConvertImpl(&in, out);
73 74 75
  sclFreeParam(&in);

  return code;
76 77
}

D
dapan1121 已提交
78 79 80 81 82 83 84 85 86 87 88 89
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); 

  int32_t code = 0;
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
  SListCell *cell = nodeList->pNodeList->pHead;
H
Haojun Liao 已提交
90
  SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
91

D
dapan1121 已提交
92 93 94 95 96
  int32_t len = 0;
  void *buf = NULL;
  
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;
D
dapan1121 已提交
97
    
D
dapan1121 已提交
98
    if (valueNode->node.resType.type != type) {
99
      out.columnData->info.type = type;
D
dapan1121 已提交
100 101 102 103 104 105 106 107 108
      if (IS_VAR_DATA_TYPE(type)) {
        if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
          out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE;
        } else {
          out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE;
        }
      } else {
        out.columnData->info.bytes = tDataTypes[type].bytes;
      }
109

110 111
      code = doConvertDataType(valueNode, &out);
      if (code != TSDB_CODE_SUCCESS) {
112
//        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
113 114 115 116
        SCL_ERR_JRET(code);
      }

      if (IS_VAR_DATA_TYPE(type)) {
117 118
        buf = colDataGetVarData(out.columnData, 0);
        len = varDataTLen(data);
D
dapan1121 已提交
119 120
      } else {
        len = tDataTypes[type].bytes;
121
        buf = out.columnData->pData;
D
dapan1121 已提交
122 123 124
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
125
      if (IS_VAR_DATA_TYPE(type)) {
126
        len = varDataTLen(buf);
D
dapan1121 已提交
127 128
      } else {
        len = valueNode->node.resType.bytes;
129
      }
D
dapan1121 已提交
130 131
    }
    
132
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
133
      sclError("taosHashPut to set failed");
D
dapan1121 已提交
134 135
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
136 137

    cell = cell->pNext;
D
dapan1121 已提交
138 139 140 141 142 143 144 145 146 147
  }

  *data = pObj;
  return TSDB_CODE_SUCCESS;

_return:
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
148 149 150 151 152 153 154
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
  void *pIter = taosHashIterate(res, NULL);
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
155
      sclFreeParam(p);
D
dapan1121 已提交
156 157 158 159 160 161
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

D
dapan1121 已提交
162
void sclFreeParam(SScalarParam *param) {
163 164
  if (param->columnData != NULL) {
    colDataDestroy(param->columnData);
165
    taosMemoryFreeClear(param->columnData);
166 167 168 169 170
  }

  if (param->pHashFilter != NULL) {
    taosHashCleanup(param->pHashFilter);
  }
D
dapan1121 已提交
171 172
}

D
dapan1121 已提交
173 174 175 176 177
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
  
wafwerar's avatar
wafwerar 已提交
178
  *res = taosMemoryMalloc(pNode->node.resType.bytes);
D
dapan1121 已提交
179 180 181 182 183 184 185 186 187
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
188 189
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
  switch (nodeType(node)) {
D
dapan1121 已提交
190 191 192 193 194
    case QUERY_NODE_LEFT_VALUE: {
      SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, 0);
      param->numOfRows = pb->info.rows;
      break;
    }
D
dapan1121 已提交
195 196
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
197

H
Haojun Liao 已提交
198
      ASSERT(param->columnData == NULL);
199
      param->numOfRows = 1;
H
Haojun Liao 已提交
200
      /*int32_t code = */sclCreateColumnInfoData(&valueNode->node.resType, 1, param);
wmmhello's avatar
wmmhello 已提交
201
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
202
        colDataAppendNULL(param->columnData, 0);
203 204
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
205
      }
D
dapan1121 已提交
206 207
      break;
    }
D
dapan1121 已提交
208 209
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
210 211
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
212 213 214
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan1121 已提交
215 216 217 218 219 220 221
      int32_t type = vectorGetConvertType(ctx->type.selfType, ctx->type.peerType);
      if (type == 0) {
        type = nodeList->dataType.type;
      }
      
      SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type));
      param->hashValueType = type;
D
dapan 已提交
222
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
223
        taosHashCleanup(param->pHashFilter);
D
dapan 已提交
224 225 226
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }   
D
dapan1121 已提交
227 228
      break;
    }
X
Xiaoyu Wang 已提交
229
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
230 231
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
232 233 234
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      
X
Xiaoyu Wang 已提交
235
      SColumnNode *ref = (SColumnNode *)node;
236 237 238 239 240 241 242 243 244 245 246

      int32_t index = -1;
      for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
        SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
        if (pb->info.blockId == ref->dataBlockId) {
          index = i;
          break;
        }
      }

      if (index == -1) {
D
dapan1121 已提交
247
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
248 249 250
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

251
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
252
      if (NULL == block || ref->slotId >= taosArrayGetSize(block->pDataBlock)) {
D
dapan 已提交
253
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
254 255 256
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
257
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
258 259
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
260 261
      break;
    }
262 263 264
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
D
dapan1121 已提交
265 266 267 268 269 270 271 272
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
      break;
    }
273 274
    default:
      break;
D
dapan1121 已提交
275 276
  }

277 278 279
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
280 281 282
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
    
283
    *rowNum = param->numOfRows;
D
dapan1121 已提交
284 285
  }

286
  param->param = ctx->param;
D
dapan1121 已提交
287 288 289
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
290
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) {  
D
dapan1121 已提交
291
  int32_t code = 0;
D
dapan1121 已提交
292 293
  if (NULL == pParamList) {
    if (ctx->pBlockList) {
D
dapan1121 已提交
294
      SSDataBlock *pBlock = taosArrayGetP(ctx->pBlockList, 0);
D
dapan1121 已提交
295 296 297 298 299
      *rowNum = pBlock->info.rows;
    } else {
      *rowNum = 1;
    }

D
dapan 已提交
300
    *paramNum = 1;
D
dapan1121 已提交
301
  } else {
D
dapan 已提交
302
    *paramNum = pParamList->length;
D
dapan1121 已提交
303 304
  }

D
dapan 已提交
305
  SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
306
  if (NULL == paramList) {
D
dapan 已提交
307
    sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
D
dapan1121 已提交
308
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
309 310
  }

D
dapan1121 已提交
311 312 313 314 315 316
  if (pParamList) {
    SNode *tnode = NULL;
    int32_t i = 0;
    if (SCL_IS_CONST_CALC(ctx)) {
      WHERE_EACH (tnode, pParamList) { 
        if (!SCL_IS_CONST_NODE(tnode)) {
D
dapan 已提交
317
          WHERE_NEXT;
D
dapan1121 已提交
318 319 320 321 322 323 324 325 326 327 328 329
        } else {
          SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
          ERASE_NODE(pParamList);
        }
        
        ++i;
      }
    } else {
      FOREACH(tnode, pParamList) { 
        SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
        ++i;
      }
D
dapan1121 已提交
330
    }
D
dapan1121 已提交
331 332 333
  } else {
    paramList[0].numOfRows = *rowNum;
  }
D
dapan1121 已提交
334

D
dapan1121 已提交
335 336
  if (0 == *rowNum) {
    taosMemoryFreeClear(paramList);    
D
dapan1121 已提交
337
  }
D
dapan1121 已提交
338

D
dapan1121 已提交
339
  *pParams = paramList;
D
dapan1121 已提交
340
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
341

D
dapan1121 已提交
342
_return:
wafwerar's avatar
wafwerar 已提交
343
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
344 345 346
  SCL_RET(code);
}

D
dapan1121 已提交
347 348 349 350 351
int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) {
  if (NULL == pNode) {
    return -1;
  }
  
wafwerar's avatar
wafwerar 已提交
352
  switch ((int)nodeType(pNode)) {
D
dapan1121 已提交
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)pNode;
      return valueNode->node.resType.type;
    }
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)pNode;
      return nodeList->dataType.type;
    }
    case QUERY_NODE_COLUMN: {
      SColumnNode *colNode = (SColumnNode *)pNode;
      return colNode->node.resType.type;
    }
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &pNode, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode);
        return -1;
      }
      return res->columnData->info.type;
    }
  }

  return -1;
}


void sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) {
  ctx->type.opResType = node->node.resType.type;
  ctx->type.selfType = sclGetNodeType(node->pLeft, ctx);
  ctx->type.peerType = sclGetNodeType(node->pRight, ctx);
}

D
dapan1121 已提交
387 388
int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
389
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
390 391 392 393 394
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
wafwerar's avatar
wafwerar 已提交
395
  SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
396 397
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
398 399 400
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
401 402
  sclSetOperatorValueType(node, ctx);

D
dapan1121 已提交
403
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
404
  if (paramNum > 1) {
D
dapan1121 已提交
405
    TSWAP(ctx->type.selfType, ctx->type.peerType);
D
dapan1121 已提交
406
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
407 408
  }

D
dapan1121 已提交
409
  *pParams = paramList;
D
dapan1121 已提交
410
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
411 412

_return:
wafwerar's avatar
wafwerar 已提交
413
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
414
  SCL_RET(code);
D
dapan1121 已提交
415 416
}

417
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
418 419
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
420
  int32_t paramNum = 0;
D
dapan1121 已提交
421
  int32_t code = 0;
D
dapan 已提交
422
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
423

D
dapan1121 已提交
424
  if (fmIsUserDefinedFunc(node->funcId)) {
425
    code = callUdfScalarFunc(node->functionName, params, paramNum, output);
426 427 428 429
    if (code != 0) {
      sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
      goto _return;
    }
D
dapan1121 已提交
430 431 432 433 434 435 436 437
  } else {
    SScalarFuncExecFuncs ffpSet = {0};
    code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
    if (code) {
      sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
  
H
Haojun Liao 已提交
438 439 440
    code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
    if (code != TSDB_CODE_SUCCESS) {
      SCL_ERR_JRET(code);
D
dapan1121 已提交
441 442 443 444 445 446 447
    }

    code = (*ffpSet.process)(params, paramNum, output);
    if (code) {
      sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
448 449 450 451
  }

_return:

D
dapan 已提交
452
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
453
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
454 455
  }

wafwerar's avatar
wafwerar 已提交
456
  taosMemoryFreeClear(params);
D
dapan1121 已提交
457 458 459 460 461 462 463 464 465 466 467 468
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
469 470
  }

D
dapan1121 已提交
471 472 473 474 475 476 477
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
478
  int32_t paramNum = 0;
D
dapan1121 已提交
479
  int32_t code = 0;
D
dapan 已提交
480
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
481 482 483 484
  if (NULL == params) {
    output->numOfRows = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
485

486 487 488 489
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
H
Haojun Liao 已提交
490 491 492
  code = sclCreateColumnInfoData(&t, rowNum, output);
  if (code != TSDB_CODE_SUCCESS) {
    SCL_ERR_JRET(code);
D
dapan1121 已提交
493
  }
D
dapan1121 已提交
494

D
dapan1121 已提交
495
  bool value = false;
D
dapan 已提交
496
  bool complete = true;
D
dapan1121 已提交
497
  for (int32_t i = 0; i < rowNum; ++i) {
D
dapan 已提交
498 499
    complete = true;
    for (int32_t m = 0; m < paramNum; ++m) {
D
dapan1121 已提交
500
      if (NULL == params[m].columnData) {
D
dapan 已提交
501
        complete = false;
D
dapan1121 已提交
502 503
        continue;
      }
504 505 506
      char* p = colDataGetData(params[m].columnData, i);
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
507
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
D
dapan1121 已提交
508
        complete = true;
D
dapan1121 已提交
509 510
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
D
dapan1121 已提交
511
        complete = true;
D
dapan1121 已提交
512 513 514 515 516 517
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

D
dapan 已提交
518 519 520
    if (complete) {
      colDataAppend(output->columnData, i, (char*) &value, false);
    }
D
dapan1121 已提交
521 522
  }

D
dapan1121 已提交
523 524 525 526 527
  if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
    sclFreeParam(output);
    output->numOfRows = 0;
  }

D
dapan1121 已提交
528
_return:
D
dapan1121 已提交
529

D
dapan 已提交
530
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
531
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
532 533
  }

wafwerar's avatar
wafwerar 已提交
534
  taosMemoryFreeClear(params);
D
dapan1121 已提交
535
  SCL_RET(code);
D
dapan1121 已提交
536 537 538 539 540 541
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
542

wmmhello's avatar
wmmhello 已提交
543
  // json not support in in operator
H
Haojun Liao 已提交
544
  if (nodeType(node->pLeft) == QUERY_NODE_VALUE) {
wmmhello's avatar
wmmhello 已提交
545
    SValueNode *valueNode = (SValueNode *)node->pLeft;
H
Haojun Liao 已提交
546
    if (valueNode->node.resType.type == TSDB_DATA_TYPE_JSON && (node->opType == OP_TYPE_IN || node->opType == OP_TYPE_NOT_IN)) {
wmmhello's avatar
wmmhello 已提交
547 548 549 550
      SCL_RET(TSDB_CODE_QRY_JSON_IN_ERROR);
    }
  }

D
dapan1121 已提交
551
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
552
  if (output->columnData == NULL) {
H
Haojun Liao 已提交
553 554 555 556
    code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
    if (code != TSDB_CODE_SUCCESS) {
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
557 558 559 560
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

D
dapan1121 已提交
561
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
562 563
  SScalarParam* pLeft = &params[0];
  SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
564

wmmhello's avatar
wmmhello 已提交
565
  terrno = TSDB_CODE_SUCCESS;
D
dapan 已提交
566
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
wmmhello's avatar
wmmhello 已提交
567
  code = terrno;
D
dapan1121 已提交
568 569

_return:
D
dapan1121 已提交
570
  for (int32_t i = 0; i < paramNum; ++i) {
571
    if (params[i].type == SHOULD_FREE_COLDATA) {
H
Haojun Liao 已提交
572
      colDataDestroy(params[i].columnData);
573
      taosMemoryFreeClear(params[i].columnData);
H
Haojun Liao 已提交
574
    }
D
dapan1121 已提交
575 576
  }

wafwerar's avatar
wafwerar 已提交
577
  taosMemoryFreeClear(params);
D
dapan1121 已提交
578
  SCL_RET(code);
D
dapan1121 已提交
579 580
}

D
dapan1121 已提交
581
EDealRes sclRewriteNullInOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) {
D
dapan1121 已提交
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
  if (opType <= OP_TYPE_CALC_MAX) {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  } else {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
      sclError("make value node failed");    
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
    
    res->node.resType.type = TSDB_DATA_TYPE_BOOL;
D
dapan1121 已提交
603
    res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
D
dapan1121 已提交
604 605 606 607 608 609 610 611 612
    res->datum.b = false;
    
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  }

  return DEAL_RES_CONTINUE;
}

D
dapan1121 已提交
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
EDealRes sclAggFuncWalker(SNode* pNode, void* pContext) {
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
    *(bool*)pContext = fmIsAggFunc(pFunc->funcId);
    if (*(bool*)pContext) {
      return DEAL_RES_END;
    }
  }

  return DEAL_RES_CONTINUE;
}


bool sclContainsAggFuncNode(SNode* pNode) {
  bool aggFunc = false;
  nodesWalkExpr(pNode, sclAggFuncWalker, (void *)&aggFunc);
  return aggFunc;
}
D
dapan1121 已提交
631

D
dapan 已提交
632
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
633
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
634
  int32_t code = 0;
D
dapan1121 已提交
635 636 637

  if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
    SValueNode *valueNode = (SValueNode *)node->pLeft;
D
dapan1121 已提交
638 639 640
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL) 
        && (!sclContainsAggFuncNode(node->pRight))) {
      return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
641
    }
D
dapan 已提交
642 643 644

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) 
      && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
D
dapan1121 已提交
645 646 647 648 649
      code = sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode);
      if (code) {
        ctx->code = code;
        return DEAL_RES_ERROR;
      }
D
dapan 已提交
650
    }
D
dapan1121 已提交
651 652 653 654
  }

  if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
    SValueNode *valueNode = (SValueNode *)node->pRight;
D
dapan1121 已提交
655 656 657
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)
       && (!sclContainsAggFuncNode(node->pLeft))) {
      return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
658
    }
D
dapan 已提交
659 660 661

    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) 
      && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
D
dapan1121 已提交
662 663 664 665 666
      code = sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode);
      if (code) {
        ctx->code = code;
        return DEAL_RES_ERROR;
      }
D
dapan 已提交
667
    }
D
dapan1121 已提交
668 669 670 671 672 673 674 675 676 677 678
  }

  if (node->pRight && (QUERY_NODE_NODE_LIST == nodeType(node->pRight))) {
    SNodeListNode *listNode = (SNodeListNode *)node->pRight;
    SNode* tnode = NULL;
    WHERE_EACH(tnode, listNode->pNodeList) {
      if (SCL_IS_NULL_VALUE_NODE(tnode)) {
        if (node->opType == OP_TYPE_IN) {
          ERASE_NODE(listNode->pNodeList);
          continue;
        } else { //OP_TYPE_NOT_IN
D
dapan1121 已提交
679
          return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
680 681 682 683 684 685 686
        }
      }

      WHERE_NEXT;
    }

    if (listNode->pNodeList->length <= 0) {
D
dapan1121 已提交
687
      return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
688 689 690 691 692 693
    }
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
694
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
695
  SFunctionNode *node = (SFunctionNode *)*pNode;
D
dapan1121 已提交
696
  SNode* tnode = NULL;
697
  if (!fmIsScalarFunc(node->funcId)) {
D
dapan1121 已提交
698 699
    return DEAL_RES_CONTINUE;
  }
700

D
dapan1121 已提交
701 702 703 704 705 706
  FOREACH(tnode, node->pParameterList) {
    if (!SCL_IS_CONST_NODE(tnode)) {
      return DEAL_RES_CONTINUE;
    }
  }

D
dapan1121 已提交
707
  SScalarParam output = {0};
708

709
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan 已提交
710
  if (ctx->code) {
D
dapan1121 已提交
711 712 713
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
714
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
715 716
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
717
    sclFreeParam(&output);
D
dapan 已提交
718
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
719 720 721
    return DEAL_RES_ERROR;
  }

722 723
  res->translate = true;

724 725
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
726
  } else {
D
dapan1121 已提交
727 728 729 730
    res->node.resType.type = output.columnData->info.type;
    res->node.resType.bytes = output.columnData->info.bytes;
    res->node.resType.scale = output.columnData->info.scale;
    res->node.resType.precision = output.columnData->info.precision;
731
    int32_t type = output.columnData->info.type;
wmmhello's avatar
wmmhello 已提交
732 733 734 735 736
    if (type == TSDB_DATA_TYPE_JSON){
      int32_t len = getJsonValueLen(output.columnData->pData);
      res->datum.p = taosMemoryCalloc(len, 1);
      memcpy(res->datum.p, output.columnData->pData, len);
    } else if (IS_VAR_DATA_TYPE(type)) {
737
      res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
738
      memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
739
    } else {
D
dapan1121 已提交
740
      nodesSetValueNodeValue(res, output.columnData->pData);
741
    }
D
dapan1121 已提交
742
  }
743

D
dapan1121 已提交
744 745 746
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
747
  sclFreeParam(&output);
D
dapan1121 已提交
748 749 750
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
751
EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
752 753
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

H
Haojun Liao 已提交
754
  SScalarParam output = {0};
D
dapan 已提交
755 756
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
757 758 759
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
760 761 762 763
  if (0 == output.numOfRows) {
    return DEAL_RES_CONTINUE;
  }

D
dapan1121 已提交
764
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
765 766
  if (NULL == res) {
    sclError("make value node failed");
767
    sclFreeParam(&output);
D
dapan 已提交
768
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
769 770 771 772
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;
773
  res->translate = true;
D
dapan1121 已提交
774

775 776 777 778
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
779
  } else {
D
dapan1121 已提交
780
    nodesSetValueNodeValue(res, output.columnData->pData);
D
dapan1121 已提交
781
  }
D
dapan1121 已提交
782 783 784 785

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
786
  sclFreeParam(&output);
D
dapan1121 已提交
787 788 789
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
790
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
791
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
792

D
dapan1121 已提交
793
  if ((!SCL_IS_CONST_NODE(node->pLeft)) || (!SCL_IS_CONST_NODE(node->pRight))) {
D
dapan 已提交
794
    return sclRewriteNonConstOperator(pNode, ctx);
D
dapan1121 已提交
795 796
  }

H
Haojun Liao 已提交
797
  SScalarParam output = {0};
D
dapan 已提交
798 799
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
800 801 802
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
803
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
804
  if (NULL == res) {
805 806
    sclError("make value node failed");
    sclFreeParam(&output);
D
dapan 已提交
807
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
808 809 810
    return DEAL_RES_ERROR;
  }

811
  res->translate = true;
D
dapan1121 已提交
812

813
  if (colDataIsNull_s(output.columnData, 0)) {
wmmhello's avatar
wmmhello 已提交
814 815 816 817 818 819
    if(node->node.resType.type != TSDB_DATA_TYPE_JSON){
      res->node.resType.type = TSDB_DATA_TYPE_NULL;
    }else{
      res->node.resType = node->node.resType;
      res->isNull = true;
    }
D
dapan1121 已提交
820
  } else {
821 822 823 824 825 826
    res->node.resType = node->node.resType;
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
      res->datum.p = output.columnData->pData;
      output.columnData->pData = NULL;
    } else {
D
dapan1121 已提交
827
      nodesSetValueNodeValue(res, output.columnData->pData);    
828
    }
D
dapan1121 已提交
829
  }
D
dapan1121 已提交
830 831 832 833

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

H
Haojun Liao 已提交
834
  sclFreeParam(&output);
D
dapan1121 已提交
835 836 837 838
  return DEAL_RES_CONTINUE;
}

EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
D
dapan 已提交
839 840
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
841
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
842
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
843 844 845
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
846
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
847 848
  }

D
dapan1121 已提交
849
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
D
dapan 已提交
850
    return sclRewriteOperator(pNode, ctx);
851
  }
852 853

  return DEAL_RES_CONTINUE;
D
dapan1121 已提交
854 855
}

D
dapan 已提交
856
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
857
  SFunctionNode *node = (SFunctionNode *)pNode;
D
dapan1121 已提交
858
  SScalarParam output = {0};
859

860
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan1121 已提交
861 862
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
863 864
  }

865
  output.type = DELEGATED_MGMT_COLDATA;
D
dapan1121 已提交
866
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
867 868
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
869 870
  }

D
dapan1121 已提交
871 872 873
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
874
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
875
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
D
dapan1121 已提交
876
  SScalarParam output = {0};
877

D
dapan1121 已提交
878 879 880
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
881 882
  }

D
dapan1121 已提交
883
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
884
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
885
    return DEAL_RES_ERROR;
D
dapan1121 已提交
886 887 888 889 890
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
891
EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
892
  SOperatorNode *node = (SOperatorNode *)pNode;
D
dapan1121 已提交
893
  SScalarParam output = {0};
D
dapan1121 已提交
894
  
D
dapan1121 已提交
895 896
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
897 898
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
899

900
  output.type = DELEGATED_MGMT_COLDATA;
D
dapan1121 已提交
901
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
902
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
903 904 905
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
906 907 908
  return DEAL_RES_CONTINUE;
}

909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
  STargetNode *target = (STargetNode *)pNode;

  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  int32_t index = -1;
  for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
    SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
    if (pb->info.blockId == target->dataBlockId) {
      index = i;
      break;
    }
  }

  if (index == -1) {
    sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);

  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  // if block->pDataBlock is not enough, there are problems if target->slotId bigger than the size of block->pDataBlock,
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);

  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }

951
  colDataAssign(col, res->columnData, res->numOfRows, NULL);
952 953 954 955 956 957
  block->info.rows = res->numOfRows;

  sclFreeParam(res);
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  return DEAL_RES_CONTINUE;
}
D
dapan 已提交
958

D
dapan1121 已提交
959
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
D
dapan1121 已提交
960
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)|| QUERY_NODE_LEFT_VALUE == nodeType(pNode)) {
D
dapan1121 已提交
961
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
962
  }
D
dapan 已提交
963 964

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
965
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
966
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
967
  }
D
dapan1121 已提交
968

D
dapan1121 已提交
969
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
970
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
971
  }
D
dapan1121 已提交
972

D
dapan1121 已提交
973
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
D
dapan 已提交
974
    return sclWalkOperator(pNode, ctx);
D
dapan1121 已提交
975
  }
D
dapan1121 已提交
976

977 978 979
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
980

D
dapan 已提交
981
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
982 983
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
984 985
}

D
dapan1121 已提交
986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
  SSDataBlock* pb = taosArrayGetP(pBlockList, 0);
  SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
  if (NULL == pLeft) {
    sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pLeft->numOfRows = pb->info.rows;
  colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
  OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);

  taosMemoryFree(pLeft);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1005 1006 1007 1008 1009 1010
int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
1011
  SScalarCtx ctx = {0};
1012
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan 已提交
1013
  if (NULL == ctx.pRes) {
1014
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan 已提交
1015 1016
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
1017
  
X
Xiaoyu Wang 已提交
1018
  nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
D
dapan 已提交
1019
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
1020 1021
  *pRes = pNode;

D
dapan 已提交
1022 1023 1024
_return:
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
1025 1026
}

D
dapan 已提交
1027
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
D
dapan1121 已提交
1028
  if (NULL == pNode || NULL == pBlockList) {
D
dapan1121 已提交
1029 1030 1031 1032
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
wmmhello's avatar
wmmhello 已提交
1033
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst ? pDst->param : NULL};
1034

1035
  // TODO: OPT performance
1036
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan1121 已提交
1037
  if (NULL == ctx.pRes) {
1038
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan1121 已提交
1039 1040
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
1041
  
X
Xiaoyu Wang 已提交
1042
  nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
D
dapan 已提交
1043
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
1044

D
dapan1121 已提交
1045 1046 1047 1048 1049 1050
  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
1051

D
dapan1121 已提交
1052 1053 1054 1055 1056 1057 1058
    if (1 == res->numOfRows) {
      SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
    } else {
      colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
      colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
      pDst->numOfRows = res->numOfRows;
    }
H
Haojun Liao 已提交
1059 1060

    sclFreeParam(res);
D
dapan1121 已提交
1061
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
D
dapan1121 已提交
1062
  }
D
dapan1121 已提交
1063

D
dapan 已提交
1064
_return:
D
dapan1121 已提交
1065
  //nodesDestroyNode(pNode);
D
dapan 已提交
1066 1067
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
1068
}
D
dapan1121 已提交
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137

int32_t scalarGetOperatorResultType(SDataType left, SDataType right, EOperatorType op, SDataType* pRes) {
  switch (op) {
    case OP_TYPE_ADD:
      if (left.type == TSDB_DATA_TYPE_TIMESTAMP && right.type == TSDB_DATA_TYPE_TIMESTAMP) {
        qError("invalid op %d, left type:%d, right type:%d", op, left.type, right.type);
        return TSDB_CODE_TSC_INVALID_OPERATION;
      }
      if ((left.type == TSDB_DATA_TYPE_TIMESTAMP && (IS_INTEGER_TYPE(right.type) || right.type == TSDB_DATA_TYPE_BOOL)) ||
          (right.type == TSDB_DATA_TYPE_TIMESTAMP && (IS_INTEGER_TYPE(left.type) || left.type == TSDB_DATA_TYPE_BOOL))) {
        pRes->type = TSDB_DATA_TYPE_TIMESTAMP;
        return TSDB_CODE_SUCCESS;
      }
      pRes->type = TSDB_DATA_TYPE_DOUBLE;
      return TSDB_CODE_SUCCESS;
    case OP_TYPE_SUB:
      if ((left.type == TSDB_DATA_TYPE_TIMESTAMP && right.type == TSDB_DATA_TYPE_BIGINT) ||
          (right.type == TSDB_DATA_TYPE_TIMESTAMP && left.type == TSDB_DATA_TYPE_BIGINT)) {
        pRes->type = TSDB_DATA_TYPE_TIMESTAMP;
        return TSDB_CODE_SUCCESS;
      }
      pRes->type = TSDB_DATA_TYPE_DOUBLE;
      return TSDB_CODE_SUCCESS;
    case OP_TYPE_MULTI:
      if (left.type == TSDB_DATA_TYPE_TIMESTAMP && right.type == TSDB_DATA_TYPE_TIMESTAMP) {
        qError("invalid op %d, left type:%d, right type:%d", op, left.type, right.type);
        return TSDB_CODE_TSC_INVALID_OPERATION;
      }
    case OP_TYPE_DIV:
      if (left.type == TSDB_DATA_TYPE_TIMESTAMP && right.type == TSDB_DATA_TYPE_TIMESTAMP) {
        qError("invalid op %d, left type:%d, right type:%d", op, left.type, right.type);
        return TSDB_CODE_TSC_INVALID_OPERATION;
      }
    case OP_TYPE_REM:
    case OP_TYPE_MINUS:
      pRes->type = TSDB_DATA_TYPE_DOUBLE;
      return TSDB_CODE_SUCCESS;
    case OP_TYPE_GREATER_THAN:
    case OP_TYPE_GREATER_EQUAL:
    case OP_TYPE_LOWER_THAN:
    case OP_TYPE_LOWER_EQUAL:
    case OP_TYPE_EQUAL:
    case OP_TYPE_NOT_EQUAL:
    case OP_TYPE_IN:
    case OP_TYPE_NOT_IN:
    case OP_TYPE_LIKE:
    case OP_TYPE_NOT_LIKE:
    case OP_TYPE_MATCH:
    case OP_TYPE_NMATCH:
    case OP_TYPE_IS_NULL:
    case OP_TYPE_IS_NOT_NULL:
    case OP_TYPE_IS_TRUE:
    case OP_TYPE_JSON_CONTAINS:
      pRes->type = TSDB_DATA_TYPE_BOOL;
      return TSDB_CODE_SUCCESS;
    case OP_TYPE_BIT_AND:
    case OP_TYPE_BIT_OR:
      pRes->type = TSDB_DATA_TYPE_BIGINT;
      return TSDB_CODE_SUCCESS;
    case OP_TYPE_JSON_GET_VALUE:
      pRes->type = TSDB_DATA_TYPE_JSON;
      return TSDB_CODE_SUCCESS;
    default:
      ASSERT(0);
      return TSDB_CODE_APP_ERROR;
  }
}