scalar.c 24.2 KB
Newer Older
D
dapan1121 已提交
1 2
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
3 4
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
5
#include "sclInt.h"
H
Haojun Liao 已提交
6 7 8
#include "sclvector.h"
#include "tcommon.h"
#include "tdatablock.h"
9
#include "scalar.h"
D
dapan1121 已提交
10

D
dapan1121 已提交
11
int32_t scalarGetOperatorParamNum(EOperatorType type) {
D
dapan1121 已提交
12
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type 
D
dapan1121 已提交
13 14
   || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type
   || OP_TYPE_MINUS == type) {
D
dapan1121 已提交
15 16 17 18 19 20
    return 1;
  }

  return 2;
}

21
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
H
Haojun Liao 已提交
22
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
23 24 25 26 27 28 29 30 31 32
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pColumnData->info.type      = pType->type;
  pColumnData->info.bytes     = pType->bytes;
  pColumnData->info.scale     = pType->scale;
  pColumnData->info.precision = pType->precision;

33
  int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows);
34 35
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
36
    taosMemoryFree(pColumnData);
37 38 39 40 41 42
    return NULL;
  } else {
    return pColumnData;
  }
}

43 44 45 46
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
  SScalarParam in = {.numOfRows = 1};
  in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
  colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
47

48
  colInfoDataEnsureCapacity(out->columnData, 0, 1);
49 50 51 52
  int32_t code = vectorConvertImpl(&in, out);
  sclFreeParam(&in);

  return code;
53 54
}

D
dapan1121 已提交
55 56 57 58 59 60 61 62 63 64 65 66
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); 

  int32_t code = 0;
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
  SListCell *cell = nodeList->pNodeList->pHead;
H
Haojun Liao 已提交
67
  SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
68

D
dapan1121 已提交
69 70 71 72 73 74 75
  int32_t len = 0;
  void *buf = NULL;
  
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;

    if (valueNode->node.resType.type != type) {
76 77 78
      out.columnData->info.type = type;
      out.columnData->info.bytes = tDataTypes[type].bytes;

79 80
      code = doConvertDataType(valueNode, &out);
      if (code != TSDB_CODE_SUCCESS) {
81
//        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
82 83 84 85
        SCL_ERR_JRET(code);
      }

      if (IS_VAR_DATA_TYPE(type)) {
86 87
        len = varDataLen(out.columnData->pData);
        buf = varDataVal(out.columnData->pData);
D
dapan1121 已提交
88 89
      } else {
        len = tDataTypes[type].bytes;
90
        buf = out.columnData->pData;
D
dapan1121 已提交
91 92 93
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
94 95 96 97 98
      if (IS_VAR_DATA_TYPE(type)) {
        len = varDataLen(buf);
        buf = varDataVal(buf);
      } else {
        len = valueNode->node.resType.bytes;
99
      }
D
dapan1121 已提交
100 101
    }
    
102
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
103 104 105
      sclError("taosHashPut failed");
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
106 107

    cell = cell->pNext;
D
dapan1121 已提交
108 109 110 111 112 113 114 115 116 117
  }

  *data = pObj;
  return TSDB_CODE_SUCCESS;

_return:
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
118 119 120 121 122 123 124
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
  void *pIter = taosHashIterate(res, NULL);
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
125
      sclFreeParam(p);
D
dapan1121 已提交
126 127 128 129 130 131
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

D
dapan1121 已提交
132
void sclFreeParam(SScalarParam *param) {
133 134
  if (param->columnData != NULL) {
    colDataDestroy(param->columnData);
wmmhello's avatar
wmmhello 已提交
135
    taosMemoryFree(param->columnData);
136 137 138 139 140
  }

  if (param->pHashFilter != NULL) {
    taosHashCleanup(param->pHashFilter);
  }
D
dapan1121 已提交
141 142
}

D
dapan1121 已提交
143 144 145 146 147
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
  
wafwerar's avatar
wafwerar 已提交
148
  *res = taosMemoryMalloc(pNode->node.resType.bytes);
D
dapan1121 已提交
149 150 151 152 153 154 155 156 157
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
158 159 160 161
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
  switch (nodeType(node)) {
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
162 163 164 165

      param->numOfRows = 1;
      param->columnData = createColumnInfoData(&valueNode->node.resType, 1);
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type) {
166
        colDataAppendNULL(param->columnData, 0);
167 168
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
169
      }
D
dapan1121 已提交
170 171
      break;
    }
D
dapan1121 已提交
172 173
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
174 175
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
176 177 178
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

179
      SCL_ERR_RET(scalarGenerateSetFromList((void**) &param->pHashFilter, node, nodeList->dataType.type));
D
dapan 已提交
180
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
181
        taosHashCleanup(param->pHashFilter);
D
dapan 已提交
182 183 184
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }   
D
dapan1121 已提交
185 186
      break;
    }
X
Xiaoyu Wang 已提交
187
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
188 189
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
190 191 192
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      
X
Xiaoyu Wang 已提交
193
      SColumnNode *ref = (SColumnNode *)node;
194 195 196 197 198 199 200 201 202 203 204

      int32_t index = -1;
      for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
        SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
        if (pb->info.blockId == ref->dataBlockId) {
          index = i;
          break;
        }
      }

      if (index == -1) {
D
dapan1121 已提交
205
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
206 207 208
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

209 210
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
      if (NULL == block || ref->slotId >= block->info.numOfCols) {
D
dapan 已提交
211
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
212 213 214
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
215
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
216 217
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
218 219
      break;
    }
220 221 222
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
D
dapan1121 已提交
223 224 225 226 227 228 229 230
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
      break;
    }
231 232
    default:
      break;
D
dapan1121 已提交
233 234
  }

235 236 237
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
238 239 240
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
    
241
    *rowNum = param->numOfRows;
D
dapan1121 已提交
242 243 244 245 246
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
247
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) {  
D
dapan1121 已提交
248
  int32_t code = 0;
D
dapan1121 已提交
249 250 251 252 253 254 255 256
  if (NULL == pParamList) {
    if (ctx->pBlockList) {
      SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0);
      *rowNum = pBlock->info.rows;
    } else {
      *rowNum = 1;
    }

D
dapan 已提交
257
    *paramNum = 1;
D
dapan1121 已提交
258
  } else {
D
dapan 已提交
259
    *paramNum = pParamList->length;
D
dapan1121 已提交
260 261
  }

D
dapan 已提交
262
  SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
263
  if (NULL == paramList) {
D
dapan 已提交
264
    sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
D
dapan1121 已提交
265
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
266 267
  }

D
dapan1121 已提交
268 269 270 271 272 273
  if (pParamList) {
    SNode *tnode = NULL;
    int32_t i = 0;
    if (SCL_IS_CONST_CALC(ctx)) {
      WHERE_EACH (tnode, pParamList) { 
        if (!SCL_IS_CONST_NODE(tnode)) {
D
dapan 已提交
274
          WHERE_NEXT;
D
dapan1121 已提交
275 276 277 278 279 280 281 282 283 284 285 286
        } else {
          SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
          ERASE_NODE(pParamList);
        }
        
        ++i;
      }
    } else {
      FOREACH(tnode, pParamList) { 
        SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
        ++i;
      }
D
dapan1121 已提交
287
    }
D
dapan1121 已提交
288 289 290
  } else {
    paramList[0].numOfRows = *rowNum;
  }
D
dapan1121 已提交
291

D
dapan1121 已提交
292 293
  if (0 == *rowNum) {
    taosMemoryFreeClear(paramList);    
D
dapan1121 已提交
294
  }
D
dapan1121 已提交
295

D
dapan1121 已提交
296
  *pParams = paramList;
D
dapan1121 已提交
297
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
298

D
dapan1121 已提交
299
_return:
wafwerar's avatar
wafwerar 已提交
300
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
301 302 303 304 305
  SCL_RET(code);
}

int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
306
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
307 308 309 310 311
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
wafwerar's avatar
wafwerar 已提交
312
  SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
313 314
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
315 316 317
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
318
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
319
  if (paramNum > 1) {
D
dapan1121 已提交
320
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
321 322
  }

D
dapan1121 已提交
323
  *pParams = paramList;
D
dapan1121 已提交
324
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
325 326

_return:
wafwerar's avatar
wafwerar 已提交
327
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
328
  SCL_RET(code);
D
dapan1121 已提交
329 330
}

331
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
332 333
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
334
  int32_t paramNum = 0;
D
dapan1121 已提交
335
  int32_t code = 0;
D
dapan 已提交
336
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
337

D
dapan1121 已提交
338 339 340 341 342 343 344
  if (fmIsUserDefinedFunc(node->funcId)) {
#if 0  
    UdfcFuncHandle udfHandle = NULL;
    
    SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle));
    code = callUdfScalarFunc(udfHandle, params, paramNum, output);
    teardownUdf(udfHandle);
345
    SCL_ERR_JRET(code);
D
dapan1121 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
#endif    
  } else {
    SScalarFuncExecFuncs ffpSet = {0};
    code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
    if (code) {
      sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
  
    output->columnData = createColumnInfoData(&node->node.resType, rowNum);
    if (output->columnData == NULL) {
      sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }

    code = (*ffpSet.process)(params, paramNum, output);
    if (code) {
      sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
366 367 368 369
  }

_return:

D
dapan 已提交
370
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
371
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
372 373
  }

wafwerar's avatar
wafwerar 已提交
374
  taosMemoryFreeClear(params);
D
dapan1121 已提交
375 376 377 378 379 380 381 382 383 384 385 386
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
387 388
  }

D
dapan1121 已提交
389 390 391 392 393 394 395
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
396
  int32_t paramNum = 0;
D
dapan1121 已提交
397
  int32_t code = 0;
D
dapan 已提交
398
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
399 400 401 402
  if (NULL == params) {
    output->numOfRows = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
403

404 405 406 407 408 409
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
  output->columnData = createColumnInfoData(&t, rowNum);
  if (output->columnData == NULL) {
D
dapan1121 已提交
410
    sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool)));
D
dapan1121 已提交
411 412
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
413

D
dapan1121 已提交
414
  bool value = false;
D
dapan 已提交
415
  bool complete = true;
D
dapan1121 已提交
416
  for (int32_t i = 0; i < rowNum; ++i) {
D
dapan 已提交
417 418
    complete = true;
    for (int32_t m = 0; m < paramNum; ++m) {
D
dapan1121 已提交
419
      if (NULL == params[m].columnData) {
D
dapan 已提交
420
        complete = false;
D
dapan1121 已提交
421 422
        continue;
      }
423 424 425
      char* p = colDataGetData(params[m].columnData, i);
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
426
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
D
dapan1121 已提交
427
        complete = true;
D
dapan1121 已提交
428 429
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
D
dapan1121 已提交
430
        complete = true;
D
dapan1121 已提交
431 432 433 434 435 436
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

D
dapan 已提交
437 438 439
    if (complete) {
      colDataAppend(output->columnData, i, (char*) &value, false);
    }
D
dapan1121 已提交
440 441
  }

D
dapan1121 已提交
442 443 444 445 446
  if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
    sclFreeParam(output);
    output->numOfRows = 0;
  }

D
dapan1121 已提交
447
_return:
D
dapan1121 已提交
448

D
dapan 已提交
449
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
450
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
451 452
  }

wafwerar's avatar
wafwerar 已提交
453
  taosMemoryFreeClear(params);
D
dapan1121 已提交
454
  SCL_RET(code);
D
dapan1121 已提交
455 456 457 458 459 460
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
461

D
dapan1121 已提交
462
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
463 464 465
  output->columnData = createColumnInfoData(&node->node.resType, rowNum);
  if (output->columnData == NULL) {
    sclError("calloc failed, size:%d", (int32_t)rowNum * node->node.resType.bytes);
D
dapan1121 已提交
466 467 468 469 470
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

D
dapan1121 已提交
471
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
472 473
  SScalarParam* pLeft = &params[0];
  SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
474

D
dapan 已提交
475
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
D
dapan1121 已提交
476 477

_return:
D
dapan1121 已提交
478
  for (int32_t i = 0; i < paramNum; ++i) {
479
//    sclFreeParam(&params[i]);
D
dapan1121 已提交
480 481
  }

wafwerar's avatar
wafwerar 已提交
482
  taosMemoryFreeClear(params);
D
dapan1121 已提交
483
  SCL_RET(code);
D
dapan1121 已提交
484 485
}

D
dapan 已提交
486
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
487
  SFunctionNode *node = (SFunctionNode *)*pNode;
D
dapan1121 已提交
488 489 490 491
  SNode* tnode = NULL;
  if (fmIsUserDefinedFunc(node->funcId)) {
    return DEAL_RES_CONTINUE;
  }
492

D
dapan1121 已提交
493 494 495 496 497 498
  FOREACH(tnode, node->pParameterList) {
    if (!SCL_IS_CONST_NODE(tnode)) {
      return DEAL_RES_CONTINUE;
    }
  }

D
dapan1121 已提交
499
  SScalarParam output = {0};
500

501
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan 已提交
502
  if (ctx->code) {
D
dapan1121 已提交
503 504 505
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
506
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
507 508
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
509
    sclFreeParam(&output);
D
dapan 已提交
510
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
511 512 513
    return DEAL_RES_ERROR;
  }

514 515
  res->translate = true;

516 517
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
518
  } else {
519 520 521
    res->node.resType = node->node.resType;
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {
522
      res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
523
      memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
524 525 526
    } else {
      memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
    }
D
dapan1121 已提交
527
  }
528

D
dapan1121 已提交
529 530 531
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
532
  sclFreeParam(&output);
D
dapan1121 已提交
533 534 535
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
536
EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
537 538
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

H
Haojun Liao 已提交
539
  SScalarParam output = {0};
D
dapan 已提交
540 541
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
542 543 544
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
545 546 547 548
  if (0 == output.numOfRows) {
    return DEAL_RES_CONTINUE;
  }

D
dapan1121 已提交
549
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
550 551
  if (NULL == res) {
    sclError("make value node failed");
552
    sclFreeParam(&output);
D
dapan 已提交
553
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
554 555 556 557
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;
558
  res->translate = true;
D
dapan1121 已提交
559

560 561 562 563
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
564
  } else {
565
    memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
D
dapan1121 已提交
566
  }
D
dapan1121 已提交
567 568 569 570

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
571
  sclFreeParam(&output);
D
dapan1121 已提交
572 573 574
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
575
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
576
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
577

D
dapan1121 已提交
578 579 580 581 582 583 584 585
  if (!SCL_IS_CONST_NODE(node->pLeft)) {
    return DEAL_RES_CONTINUE;
  }

  if (!SCL_IS_CONST_NODE(node->pRight)) {
    return DEAL_RES_CONTINUE;
  }

H
Haojun Liao 已提交
586
  SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
D
dapan 已提交
587 588
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
589 590 591
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
592
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
593
  if (NULL == res) {
D
dapan1121 已提交
594 595
    sclError("make value node failed");    
    sclFreeParam(&output);    
D
dapan 已提交
596
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
597 598 599
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
600
  res->node.resType = node->node.resType;
601
  res->translate = true;
D
dapan1121 已提交
602

603
  int32_t type = output.columnData->info.type;
604
  if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
605 606
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
607
  } else {
608
    memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
D
dapan1121 已提交
609
  }
D
dapan1121 已提交
610 611 612 613

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

H
Haojun Liao 已提交
614
  sclFreeParam(&output);
D
dapan1121 已提交
615 616 617 618
  return DEAL_RES_CONTINUE;
}

EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
D
dapan1121 已提交
619
  if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) {
D
dapan1121 已提交
620 621 622
    return DEAL_RES_CONTINUE;
  }

D
dapan 已提交
623 624
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
625
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
626
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
627 628 629
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
630
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
631 632
  }

D
dapan1121 已提交
633
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
D
dapan 已提交
634
    return sclRewriteOperator(pNode, ctx);
D
dapan1121 已提交
635 636
  }  
  
D
dapan1121 已提交
637
  sclError("invalid node type for calculating constants, type:%d", nodeType(*pNode));
D
dapan 已提交
638
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
D
dapan1121 已提交
639
  return DEAL_RES_ERROR;
D
dapan1121 已提交
640 641
}

D
dapan 已提交
642
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
643
  SFunctionNode *node = (SFunctionNode *)pNode;
D
dapan1121 已提交
644 645
  SScalarParam output = {0};
  
646
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan1121 已提交
647 648
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
649 650
  }

D
dapan1121 已提交
651
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
652 653
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
654 655
  }

D
dapan1121 已提交
656 657 658
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
659
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
660
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
D
dapan1121 已提交
661 662 663 664 665
  SScalarParam output = {0};
  
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
666 667
  }

D
dapan1121 已提交
668
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
669
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
670
    return DEAL_RES_ERROR;
D
dapan1121 已提交
671 672 673 674 675
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
676
EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
677
  SOperatorNode *node = (SOperatorNode *)pNode;
D
dapan1121 已提交
678
  SScalarParam output = {0};
D
dapan1121 已提交
679
  
D
dapan1121 已提交
680 681
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
682 683
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
684

D
dapan1121 已提交
685
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
686
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
687 688 689
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
690 691 692
  return DEAL_RES_CONTINUE;
}

693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
  STargetNode *target = (STargetNode *)pNode;

  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  int32_t index = -1;
  for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
    SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
    if (pb->info.blockId == target->dataBlockId) {
      index = i;
      break;
    }
  }

  if (index == -1) {
    sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);

  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  // if block->pDataBlock is not enough, there are problems if target->slotId bigger than the size of block->pDataBlock,
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);

  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }

  colDataAssign(col, res->columnData, res->numOfRows);
  block->info.rows = res->numOfRows;

  sclFreeParam(res);
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  return DEAL_RES_CONTINUE;
}
D
dapan 已提交
742

D
dapan1121 已提交
743
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
X
Xiaoyu Wang 已提交
744
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) {
D
dapan1121 已提交
745
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
746
  }
D
dapan 已提交
747 748

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
749
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
750
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
751
  }
D
dapan1121 已提交
752

D
dapan1121 已提交
753
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
754
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
755
  }
D
dapan1121 已提交
756

D
dapan1121 已提交
757
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
D
dapan 已提交
758
    return sclWalkOperator(pNode, ctx);
D
dapan1121 已提交
759
  }
D
dapan1121 已提交
760

761 762 763
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
764

D
dapan 已提交
765
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
766 767
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
768 769 770 771 772 773 774 775
}

int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
776
  SScalarCtx ctx = {0};
777
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan 已提交
778 779 780 781
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
782
  
X
Xiaoyu Wang 已提交
783
  nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
D
dapan 已提交
784
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
785 786
  *pRes = pNode;

D
dapan 已提交
787 788 789
_return:
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
790 791
}

D
dapan 已提交
792
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
D
dapan1121 已提交
793
  if (NULL == pNode || NULL == pBlockList) {
D
dapan1121 已提交
794 795 796 797
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
798
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList};
799

800
  // TODO: OPT performance
801
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan1121 已提交
802 803 804 805
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
806
  
X
Xiaoyu Wang 已提交
807
  nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
D
dapan 已提交
808
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
809

D
dapan1121 已提交
810 811 812 813 814 815 816
  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
    
817 818
    colDataAssign(pDst->columnData, res->columnData, res->numOfRows);
    pDst->numOfRows = res->numOfRows;
D
dapan1121 已提交
819
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
D
dapan1121 已提交
820
  }
D
dapan1121 已提交
821

D
dapan 已提交
822
_return:
D
dapan1121 已提交
823
  //nodesDestroyNode(pNode);
D
dapan 已提交
824 825
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
826
}