scalar.c 23.7 KB
Newer Older
D
dapan1121 已提交
1 2
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
3 4
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
5
#include "sclInt.h"
H
Haojun Liao 已提交
6 7 8
#include "sclvector.h"
#include "tcommon.h"
#include "tdatablock.h"
9
#include "scalar.h"
D
dapan1121 已提交
10

D
dapan1121 已提交
11
int32_t scalarGetOperatorParamNum(EOperatorType type) {
D
dapan1121 已提交
12
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type 
D
dapan1121 已提交
13 14
   || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type
   || OP_TYPE_MINUS == type) {
D
dapan1121 已提交
15 16 17 18 19 20
    return 1;
  }

  return 2;
}

21
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
H
Haojun Liao 已提交
22
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
23 24 25 26 27 28 29 30 31 32
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pColumnData->info.type      = pType->type;
  pColumnData->info.bytes     = pType->bytes;
  pColumnData->info.scale     = pType->scale;
  pColumnData->info.precision = pType->precision;

33
  int32_t code = colInfoDataEnsureCapacity(pColumnData, 0, numOfRows);
34 35
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
36
    taosMemoryFree(pColumnData);
37 38 39 40 41 42
    return NULL;
  } else {
    return pColumnData;
  }
}

43 44 45 46
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
  SScalarParam in = {.numOfRows = 1};
  in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
  colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
47

48
  colInfoDataEnsureCapacity(out->columnData, 0, 1);
49 50 51 52
  int32_t code = vectorConvertImpl(&in, out);
  sclFreeParam(&in);

  return code;
53 54
}

D
dapan1121 已提交
55 56 57 58 59 60 61 62 63 64 65 66
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); 

  int32_t code = 0;
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
  SListCell *cell = nodeList->pNodeList->pHead;
H
Haojun Liao 已提交
67
  SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
68

D
dapan1121 已提交
69 70 71 72 73 74 75
  int32_t len = 0;
  void *buf = NULL;
  
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;

    if (valueNode->node.resType.type != type) {
76 77 78
      out.columnData->info.type = type;
      out.columnData->info.bytes = tDataTypes[type].bytes;

79 80
      code = doConvertDataType(valueNode, &out);
      if (code != TSDB_CODE_SUCCESS) {
81
//        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
82 83 84 85
        SCL_ERR_JRET(code);
      }

      if (IS_VAR_DATA_TYPE(type)) {
86 87
        len = varDataLen(out.columnData->pData);
        buf = varDataVal(out.columnData->pData);
D
dapan1121 已提交
88 89
      } else {
        len = tDataTypes[type].bytes;
90
        buf = out.columnData->pData;
D
dapan1121 已提交
91 92 93
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
94 95 96 97 98
      if (IS_VAR_DATA_TYPE(type)) {
        len = varDataLen(buf);
        buf = varDataVal(buf);
      } else {
        len = valueNode->node.resType.bytes;
99
      }
D
dapan1121 已提交
100 101
    }
    
102
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
103 104 105
      sclError("taosHashPut failed");
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
106 107

    cell = cell->pNext;
D
dapan1121 已提交
108 109 110 111 112 113 114 115 116 117
  }

  *data = pObj;
  return TSDB_CODE_SUCCESS;

_return:
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
118 119 120 121 122 123 124
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
  void *pIter = taosHashIterate(res, NULL);
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
125
      sclFreeParam(p);
D
dapan1121 已提交
126 127 128 129 130 131
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

D
dapan1121 已提交
132
void sclFreeParam(SScalarParam *param) {
133 134
  if (param->columnData != NULL) {
    colDataDestroy(param->columnData);
wmmhello's avatar
wmmhello 已提交
135
    taosMemoryFree(param->columnData);
136 137 138 139 140
  }

  if (param->pHashFilter != NULL) {
    taosHashCleanup(param->pHashFilter);
  }
D
dapan1121 已提交
141 142
}

D
dapan1121 已提交
143 144 145 146 147
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
  
wafwerar's avatar
wafwerar 已提交
148
  *res = taosMemoryMalloc(pNode->node.resType.bytes);
D
dapan1121 已提交
149 150 151 152 153 154 155 156 157
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
158 159 160 161
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
  switch (nodeType(node)) {
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
162 163 164 165

      param->numOfRows = 1;
      param->columnData = createColumnInfoData(&valueNode->node.resType, 1);
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type) {
166
        colDataAppendNULL(param->columnData, 0);
167 168
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
169
      }
D
dapan1121 已提交
170 171
      break;
    }
D
dapan1121 已提交
172 173
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
174 175
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
176 177 178
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

179
      SCL_ERR_RET(scalarGenerateSetFromList((void**) &param->pHashFilter, node, nodeList->dataType.type));
D
dapan 已提交
180
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
181
        taosHashCleanup(param->pHashFilter);
D
dapan 已提交
182 183 184
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }   
D
dapan1121 已提交
185 186
      break;
    }
X
Xiaoyu Wang 已提交
187
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
188 189
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
190 191 192
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      
X
Xiaoyu Wang 已提交
193
      SColumnNode *ref = (SColumnNode *)node;
194 195 196 197 198 199 200 201 202 203 204

      int32_t index = -1;
      for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
        SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
        if (pb->info.blockId == ref->dataBlockId) {
          index = i;
          break;
        }
      }

      if (index == -1) {
D
dapan1121 已提交
205
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
206 207 208
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

209 210
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
      if (NULL == block || ref->slotId >= block->info.numOfCols) {
D
dapan 已提交
211
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
212 213 214
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
215
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
216 217
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
218 219
      break;
    }
220 221 222
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
D
dapan1121 已提交
223 224 225 226 227 228 229 230
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
      break;
    }
231 232
    default:
      break;
D
dapan1121 已提交
233 234
  }

235 236 237
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
238 239 240
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
    
241
    *rowNum = param->numOfRows;
D
dapan1121 已提交
242 243 244 245 246
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
247
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) {  
D
dapan1121 已提交
248
  int32_t code = 0;
D
dapan1121 已提交
249 250 251 252 253 254 255 256
  if (NULL == pParamList) {
    if (ctx->pBlockList) {
      SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0);
      *rowNum = pBlock->info.rows;
    } else {
      *rowNum = 1;
    }

D
dapan 已提交
257
    *paramNum = 1;
D
dapan1121 已提交
258
  } else {
D
dapan 已提交
259
    *paramNum = pParamList->length;
D
dapan1121 已提交
260 261
  }

D
dapan 已提交
262
  SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
263
  if (NULL == paramList) {
D
dapan 已提交
264
    sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
D
dapan1121 已提交
265
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
266 267
  }

D
dapan1121 已提交
268 269 270 271 272 273
  if (pParamList) {
    SNode *tnode = NULL;
    int32_t i = 0;
    if (SCL_IS_CONST_CALC(ctx)) {
      WHERE_EACH (tnode, pParamList) { 
        if (!SCL_IS_CONST_NODE(tnode)) {
D
dapan 已提交
274
          WHERE_NEXT;
D
dapan1121 已提交
275 276 277 278 279 280 281 282 283 284 285 286
        } else {
          SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
          ERASE_NODE(pParamList);
        }
        
        ++i;
      }
    } else {
      FOREACH(tnode, pParamList) { 
        SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
        ++i;
      }
D
dapan1121 已提交
287
    }
D
dapan1121 已提交
288 289 290
  } else {
    paramList[0].numOfRows = *rowNum;
  }
D
dapan1121 已提交
291

D
dapan1121 已提交
292 293
  if (0 == *rowNum) {
    taosMemoryFreeClear(paramList);    
D
dapan1121 已提交
294
  }
D
dapan1121 已提交
295

D
dapan1121 已提交
296
  *pParams = paramList;
D
dapan1121 已提交
297
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
298

D
dapan1121 已提交
299
_return:
wafwerar's avatar
wafwerar 已提交
300
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
301 302 303 304 305
  SCL_RET(code);
}

int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
306
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
307 308 309 310 311
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
wafwerar's avatar
wafwerar 已提交
312
  SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
313 314
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
315 316 317
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
318
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
319
  if (paramNum > 1) {
D
dapan1121 已提交
320
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
321 322
  }

D
dapan1121 已提交
323
  *pParams = paramList;
D
dapan1121 已提交
324
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
325 326

_return:
wafwerar's avatar
wafwerar 已提交
327
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
328
  SCL_RET(code);
D
dapan1121 已提交
329 330
}

331
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
332 333 334
  SScalarFuncExecFuncs ffpSet = {0};
  int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
  if (code) {
D
dapan1121 已提交
335
    sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
D
dapan1121 已提交
336
    SCL_ERR_RET(code);
D
dapan1121 已提交
337 338
  }

D
dapan1121 已提交
339 340
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
341 342
  int32_t paramNum = 0;
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
343

344 345
  output->columnData = createColumnInfoData(&node->node.resType, rowNum);
  if (output->columnData == NULL) {
346
    sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
D
dapan1121 已提交
347 348 349
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan 已提交
350
  code = (*ffpSet.process)(params, paramNum, output);
351 352 353
  if (code) {
    sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
    SCL_ERR_JRET(code);
D
dapan1121 已提交
354 355 356 357
  }

_return:

D
dapan 已提交
358
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
359
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
360 361
  }

wafwerar's avatar
wafwerar 已提交
362
  taosMemoryFreeClear(params);
D
dapan1121 已提交
363 364 365 366 367 368 369 370 371 372 373 374
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
375 376
  }

D
dapan1121 已提交
377 378 379 380 381 382 383
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
384
  int32_t paramNum = 0;
D
dapan1121 已提交
385
  int32_t code = 0;
D
dapan 已提交
386
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
387 388 389 390
  if (NULL == params) {
    output->numOfRows = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
391

392 393 394 395 396 397
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
  output->columnData = createColumnInfoData(&t, rowNum);
  if (output->columnData == NULL) {
D
dapan1121 已提交
398
    sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool)));
D
dapan1121 已提交
399 400
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
401

D
dapan1121 已提交
402
  bool value = false;
D
dapan 已提交
403
  bool complete = true;
D
dapan1121 已提交
404
  for (int32_t i = 0; i < rowNum; ++i) {
D
dapan 已提交
405 406
    complete = true;
    for (int32_t m = 0; m < paramNum; ++m) {
D
dapan1121 已提交
407
      if (NULL == params[m].columnData) {
D
dapan 已提交
408
        complete = false;
D
dapan1121 已提交
409 410
        continue;
      }
411 412 413
      char* p = colDataGetData(params[m].columnData, i);
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
414
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
D
dapan1121 已提交
415
        complete = true;
D
dapan1121 已提交
416 417
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
D
dapan1121 已提交
418
        complete = true;
D
dapan1121 已提交
419 420 421 422 423 424
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

D
dapan 已提交
425 426 427
    if (complete) {
      colDataAppend(output->columnData, i, (char*) &value, false);
    }
D
dapan1121 已提交
428 429
  }

D
dapan1121 已提交
430 431 432 433 434
  if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
    sclFreeParam(output);
    output->numOfRows = 0;
  }

D
dapan1121 已提交
435
_return:
D
dapan1121 已提交
436

D
dapan 已提交
437
  for (int32_t i = 0; i < paramNum; ++i) {
H
Haojun Liao 已提交
438
//    sclFreeParamNoData(params + i);
D
dapan1121 已提交
439 440
  }

wafwerar's avatar
wafwerar 已提交
441
  taosMemoryFreeClear(params);
D
dapan1121 已提交
442
  SCL_RET(code);
D
dapan1121 已提交
443 444 445 446 447 448
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
449

D
dapan1121 已提交
450
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
451 452 453
  output->columnData = createColumnInfoData(&node->node.resType, rowNum);
  if (output->columnData == NULL) {
    sclError("calloc failed, size:%d", (int32_t)rowNum * node->node.resType.bytes);
D
dapan1121 已提交
454 455 456 457 458
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

D
dapan1121 已提交
459
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
460 461
  SScalarParam* pLeft = &params[0];
  SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
462

D
dapan 已提交
463
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
D
dapan1121 已提交
464 465

_return:
D
dapan1121 已提交
466
  for (int32_t i = 0; i < paramNum; ++i) {
467
//    sclFreeParam(&params[i]);
D
dapan1121 已提交
468 469
  }

wafwerar's avatar
wafwerar 已提交
470
  taosMemoryFreeClear(params);
D
dapan1121 已提交
471
  SCL_RET(code);
D
dapan1121 已提交
472 473
}

D
dapan 已提交
474
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
475
  SFunctionNode *node = (SFunctionNode *)*pNode;
D
dapan1121 已提交
476 477 478 479 480 481 482 483 484 485 486
  SNode* tnode = NULL;
  if (fmIsUserDefinedFunc(node->funcId)) {
    return DEAL_RES_CONTINUE;
  }
  
  FOREACH(tnode, node->pParameterList) {
    if (!SCL_IS_CONST_NODE(tnode)) {
      return DEAL_RES_CONTINUE;
    }
  }

D
dapan1121 已提交
487
  SScalarParam output = {0};
488

489
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan 已提交
490
  if (ctx->code) {
D
dapan1121 已提交
491 492 493
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
494
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
495 496
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
497
    sclFreeParam(&output);
D
dapan 已提交
498
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
499 500 501
    return DEAL_RES_ERROR;
  }

502 503
  if (colDataIsNull_s(output.columnData, 0)) {
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
D
dapan1121 已提交
504
  } else {
505 506 507 508 509 510 511 512
    res->node.resType = node->node.resType;
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {
      res->datum.p = output.columnData->pData;
      output.columnData->pData = NULL;
    } else {
      memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
    }
D
dapan1121 已提交
513
  }
514

D
dapan1121 已提交
515 516 517
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
518
  sclFreeParam(&output);
D
dapan1121 已提交
519 520 521
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
522
EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
523 524
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

H
Haojun Liao 已提交
525
  SScalarParam output = {0};
D
dapan 已提交
526 527
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
528 529 530
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
531 532 533 534
  if (0 == output.numOfRows) {
    return DEAL_RES_CONTINUE;
  }

D
dapan1121 已提交
535
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
536 537
  if (NULL == res) {
    sclError("make value node failed");
538
    sclFreeParam(&output);
D
dapan 已提交
539
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
540 541 542 543 544
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;

545 546 547 548
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
549
  } else {
550
    memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
D
dapan1121 已提交
551
  }
D
dapan1121 已提交
552 553 554 555

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
556
  sclFreeParam(&output);
D
dapan1121 已提交
557 558 559
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
560
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
561
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
562

D
dapan1121 已提交
563 564 565 566 567 568 569 570
  if (!SCL_IS_CONST_NODE(node->pLeft)) {
    return DEAL_RES_CONTINUE;
  }

  if (!SCL_IS_CONST_NODE(node->pRight)) {
    return DEAL_RES_CONTINUE;
  }

H
Haojun Liao 已提交
571
  SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
D
dapan 已提交
572 573
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
574 575 576
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
577
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
578
  if (NULL == res) {
D
dapan1121 已提交
579 580
    sclError("make value node failed");    
    sclFreeParam(&output);    
D
dapan 已提交
581
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
582 583 584
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
585
  res->node.resType = node->node.resType;
D
dapan1121 已提交
586

587
  int32_t type = output.columnData->info.type;
588
  if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
589 590
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
591
  } else {
592
    memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
D
dapan1121 已提交
593
  }
D
dapan1121 已提交
594 595 596 597

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

H
Haojun Liao 已提交
598
  sclFreeParam(&output);
D
dapan1121 已提交
599 600 601 602
  return DEAL_RES_CONTINUE;
}

EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
D
dapan1121 已提交
603
  if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) {
D
dapan1121 已提交
604 605 606
    return DEAL_RES_CONTINUE;
  }

D
dapan 已提交
607 608
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
609
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
610
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
611 612 613
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
614
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
615 616
  }

D
dapan1121 已提交
617
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
D
dapan 已提交
618
    return sclRewriteOperator(pNode, ctx);
D
dapan1121 已提交
619 620
  }  
  
D
dapan1121 已提交
621
  sclError("invalid node type for calculating constants, type:%d", nodeType(*pNode));
D
dapan 已提交
622
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
D
dapan1121 已提交
623
  return DEAL_RES_ERROR;
D
dapan1121 已提交
624 625
}

D
dapan 已提交
626
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
627
  SFunctionNode *node = (SFunctionNode *)pNode;
D
dapan1121 已提交
628 629
  SScalarParam output = {0};
  
630
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan1121 已提交
631 632
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
633 634
  }

D
dapan1121 已提交
635
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
636 637
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
638 639
  }

D
dapan1121 已提交
640 641 642
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
643
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
644
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
D
dapan1121 已提交
645 646 647 648 649
  SScalarParam output = {0};
  
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
650 651
  }

D
dapan1121 已提交
652
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
653
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
654
    return DEAL_RES_ERROR;
D
dapan1121 已提交
655 656 657 658 659
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
660
EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
661
  SOperatorNode *node = (SOperatorNode *)pNode;
D
dapan1121 已提交
662
  SScalarParam output = {0};
D
dapan1121 已提交
663
  
D
dapan1121 已提交
664 665
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
666 667
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
668

D
dapan1121 已提交
669
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
670
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
671 672 673
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
674 675 676
  return DEAL_RES_CONTINUE;
}

677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
  STargetNode *target = (STargetNode *)pNode;

  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  int32_t index = -1;
  for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
    SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
    if (pb->info.blockId == target->dataBlockId) {
      index = i;
      break;
    }
  }

  if (index == -1) {
    sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);

  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  // if block->pDataBlock is not enough, there are problems if target->slotId bigger than the size of block->pDataBlock,
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);

  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }

  colDataAssign(col, res->columnData, res->numOfRows);
  block->info.rows = res->numOfRows;

  sclFreeParam(res);
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  return DEAL_RES_CONTINUE;
}
D
dapan 已提交
726

D
dapan1121 已提交
727
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
X
Xiaoyu Wang 已提交
728
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) {
D
dapan1121 已提交
729
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
730
  }
D
dapan 已提交
731 732

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
733
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
734
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
735
  }
D
dapan1121 已提交
736

D
dapan1121 已提交
737
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
738
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
739
  }
D
dapan1121 已提交
740

D
dapan1121 已提交
741
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
D
dapan 已提交
742
    return sclWalkOperator(pNode, ctx);
D
dapan1121 已提交
743
  }
D
dapan1121 已提交
744

745 746 747
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
748

D
dapan 已提交
749
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
750 751
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
752 753 754 755 756 757 758 759
}

int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
760
  SScalarCtx ctx = {0};
761
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan 已提交
762 763 764 765
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
766
  
X
Xiaoyu Wang 已提交
767
  nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
D
dapan 已提交
768
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
769 770
  *pRes = pNode;

D
dapan 已提交
771 772 773
_return:
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
774 775
}

D
dapan 已提交
776
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
D
dapan1121 已提交
777
  if (NULL == pNode || NULL == pBlockList) {
D
dapan1121 已提交
778 779 780 781
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
782
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList};
783

784
  // TODO: OPT performance
785
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan1121 已提交
786 787 788 789
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
790
  
X
Xiaoyu Wang 已提交
791
  nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
D
dapan 已提交
792
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
793

D
dapan1121 已提交
794 795 796 797 798 799 800
  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
    
801 802
    colDataAssign(pDst->columnData, res->columnData, res->numOfRows);
    pDst->numOfRows = res->numOfRows;
D
dapan1121 已提交
803
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
D
dapan1121 已提交
804
  }
D
dapan1121 已提交
805

D
dapan 已提交
806
_return:
D
dapan1121 已提交
807
  //nodesDestroyNode(pNode);
D
dapan 已提交
808 809
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
810
}