scalar.c 22.0 KB
Newer Older
D
dapan1121 已提交
1 2
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
3 4
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
5
#include "sclInt.h"
H
Haojun Liao 已提交
6 7 8
#include "sclvector.h"
#include "tcommon.h"
#include "tdatablock.h"
D
dapan1121 已提交
9

D
dapan1121 已提交
10
int32_t scalarGetOperatorParamNum(EOperatorType type) {
D
dapan1121 已提交
11 12
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type 
   || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type) {
D
dapan1121 已提交
13 14 15 16 17 18
    return 1;
  }

  return 2;
}

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
  SColumnInfoData* pColumnData = calloc(1, sizeof(SColumnInfoData));
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pColumnData->info.type      = pType->type;
  pColumnData->info.bytes     = pType->bytes;
  pColumnData->info.scale     = pType->scale;
  pColumnData->info.precision = pType->precision;

  int32_t code = blockDataEnsureColumnCapacity(pColumnData, numOfRows);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tfree(pColumnData);
    return NULL;
  } else {
    return pColumnData;
  }
}

int32_t doConvertDataType(SScalarParam* in, SScalarParam* out, SValueNode* pValueNode) {
  in->columnData = createColumnInfoData(&pValueNode->node.resType, 1);
  colDataAppend(in->columnData, 0, nodesGetValueFromNode(pValueNode), false);
44
  in->numOfRows = 1;
45

46
  blockDataEnsureColumnCapacity(out->columnData, 1);
47 48 49
  return vectorConvertImpl(in, out);
}

D
dapan1121 已提交
50 51 52 53 54 55 56 57 58 59 60 61
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); 

  int32_t code = 0;
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
  SListCell *cell = nodeList->pNodeList->pHead;
62 63 64

  SScalarParam in = {.columnData = calloc(1, sizeof(SColumnInfoData))}, out = {.columnData = calloc(1, sizeof(SColumnInfoData))};

D
dapan1121 已提交
65 66 67 68 69 70 71
  int32_t len = 0;
  void *buf = NULL;
  
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;

    if (valueNode->node.resType.type != type) {
72 73 74
      out.columnData->info.type = type;
      out.columnData->info.bytes = tDataTypes[type].bytes;

75
      doConvertDataType(&in, &out, valueNode);
D
dapan1121 已提交
76
      if (code) {
77
//        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
78 79 80 81
        SCL_ERR_JRET(code);
      }

      if (IS_VAR_DATA_TYPE(type)) {
82 83
        len = varDataLen(out.columnData->pData);
        buf = varDataVal(out.columnData->pData);
D
dapan1121 已提交
84 85
      } else {
        len = tDataTypes[type].bytes;
86
        buf = out.columnData->pData;
D
dapan1121 已提交
87 88 89
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
90 91 92 93 94
      if (IS_VAR_DATA_TYPE(type)) {
        len = varDataLen(buf);
        buf = varDataVal(buf);
      } else {
        len = valueNode->node.resType.bytes;
95
      }
D
dapan1121 已提交
96 97
    }
    
98
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
99 100 101
      sclError("taosHashPut failed");
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
102 103

    cell = cell->pNext;
D
dapan1121 已提交
104 105 106 107 108 109 110 111 112 113
  }

  *data = pObj;
  return TSDB_CODE_SUCCESS;

_return:
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
114 115 116 117 118 119 120
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
  void *pIter = taosHashIterate(res, NULL);
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
121
      sclFreeParam(p);
D
dapan1121 已提交
122 123 124 125 126 127
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

D
dapan1121 已提交
128
void sclFreeParamNoData(SScalarParam *param) {
129
//  tfree(param->bitmap);
D
dapan1121 已提交
130 131 132 133
}

void sclFreeParam(SScalarParam *param) {
  sclFreeParamNoData(param);
134 135 136 137 138 139 140
//  if (!param->dataInBlock) {
//    if (SCL_DATA_TYPE_DUMMY_HASH == param->type) {
//      taosHashCleanup((SHashObj *)param->orig.data);
//    } else {
//      tfree(param->orig.data);
//    }
//  }
D
dapan1121 已提交
141 142
}

D
dapan1121 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
  
  *res = malloc(pNode->node.resType.bytes);
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
158 159 160 161
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
  switch (nodeType(node)) {
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
162 163 164 165 166 167 168

      param->numOfRows = 1;
      param->columnData = createColumnInfoData(&valueNode->node.resType, 1);
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type) {
        colDataAppend(param->columnData, 0, NULL, true);
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
169
      }
D
dapan1121 已提交
170 171
      break;
    }
172

D
dapan1121 已提交
173 174
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
175 176
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
177 178 179
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

180
      SCL_ERR_RET(scalarGenerateSetFromList((void**) &param->pHashFilter, node, nodeList->dataType.type));
D
dapan 已提交
181
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
182
        taosHashCleanup(param->pHashFilter);
D
dapan 已提交
183 184 185
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
      }   
D
dapan1121 已提交
186 187
      break;
    }
X
Xiaoyu Wang 已提交
188
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
189 190
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
191 192 193
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      
X
Xiaoyu Wang 已提交
194
      SColumnNode *ref = (SColumnNode *)node;
D
dapan1121 已提交
195 196
      if (ref->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
197 198 199
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan1121 已提交
200
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, ref->dataBlockId);
D
dapan 已提交
201 202
      if (NULL == block || ref->slotId >= taosArrayGetSize(block->pDataBlock)) {
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
203 204 205
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
206
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
207 208
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
209 210 211 212 213 214 215 216 217 218 219 220
      break;
    }
    case QUERY_NODE_LOGIC_CONDITION:
    case QUERY_NODE_OPERATOR: {
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
      break;
    }
221 222 223

    default:
      break;
D
dapan1121 已提交
224 225
  }

226 227 228
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
229 230 231
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
    
232
    *rowNum = param->numOfRows;
D
dapan1121 已提交
233 234 235 236 237 238 239
  }

  return TSDB_CODE_SUCCESS;
}

int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
240 241 242
  SScalarParam *paramList = calloc(pParamList->length, sizeof(SScalarParam));
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(pParamList->length * sizeof(SScalarParam)));
D
dapan1121 已提交
243
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
244 245
  }

D
dapan1121 已提交
246 247
  SListCell *cell = pParamList->pHead;
  for (int32_t i = 0; i < pParamList->length; ++i) {
D
dapan1121 已提交
248 249
    if (NULL == cell || NULL == cell->pNode) {
      sclError("invalid cell, cell:%p, pNode:%p", cell, cell->pNode);
D
dapan1121 已提交
250
      SCL_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
251 252
    }

D
dapan1121 已提交
253
    SCL_ERR_JRET(sclInitParam(cell->pNode, &paramList[i], ctx, rowNum));
D
dapan1121 已提交
254 255
    cell = cell->pNext;
  }
D
dapan1121 已提交
256

D
dapan1121 已提交
257
  *pParams = paramList;
D
dapan1121 已提交
258
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
259

D
dapan1121 已提交
260
_return:
D
dapan1121 已提交
261
  tfree(paramList);
D
dapan1121 已提交
262 263 264 265 266
  SCL_RET(code);
}

int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
267
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
268 269 270 271 272
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
  
D
dapan1121 已提交
273 274 275
  SScalarParam *paramList = calloc(paramNum, sizeof(SScalarParam));
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
276 277 278
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
279
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
280
  if (paramNum > 1) {
D
dapan1121 已提交
281
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
282 283
  }

D
dapan1121 已提交
284
  *pParams = paramList;
D
dapan1121 已提交
285
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
286 287

_return:
D
dapan1121 已提交
288
  tfree(paramList);
D
dapan1121 已提交
289
  SCL_RET(code);
D
dapan1121 已提交
290 291
}

D
dapan1121 已提交
292
int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
293 294
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid function parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
D
dapan1121 已提交
295
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
296 297 298 299 300
  }

  SScalarFuncExecFuncs ffpSet = {0};
  int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
  if (code) {
D
dapan1121 已提交
301
    sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
D
dapan1121 已提交
302
    SCL_ERR_RET(code);
D
dapan1121 已提交
303 304
  }

D
dapan1121 已提交
305 306 307 308
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &rowNum));

309 310 311 312 313 314
  output->columnData->info.type = node->node.resType.type;
  output->columnData->info.bytes = tDataTypes[node->node.resType.type].bytes;

  code = blockDataEnsureColumnCapacity(output->columnData, rowNum);
  if (code != TSDB_CODE_SUCCESS) {
    sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
D
dapan1121 已提交
315 316 317 318 319 320
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  for (int32_t i = 0; i < rowNum; ++i) {
    code = (*ffpSet.process)(params, node->pParameterList->length, output);
    if (code) {
D
dapan1121 已提交
321
      sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
D
dapan1121 已提交
322 323 324 325 326 327
      SCL_ERR_JRET(code);    
    }
  }

_return:

D
dapan1121 已提交
328 329 330 331
  for (int32_t i = 0; i < node->pParameterList->length; ++i) {
    sclFreeParamNoData(params + i);
  }

D
dapan1121 已提交
332 333 334 335 336 337 338 339 340 341 342 343 344
  tfree(params);
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
345 346
  }

D
dapan1121 已提交
347 348 349 350 351 352 353 354 355 356
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &rowNum));

357 358 359 360 361 362
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
  output->columnData = createColumnInfoData(&t, rowNum);
  if (output->columnData == NULL) {
D
dapan1121 已提交
363
    sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool)));
D
dapan1121 已提交
364 365
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
366

D
dapan1121 已提交
367 368 369
  bool value = false;
  for (int32_t i = 0; i < rowNum; ++i) {
    for (int32_t m = 0; m < node->pParameterList->length; ++m) {
370 371 372
      char* p = colDataGetData(params[m].columnData, i);
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
373 374 375 376 377 378 379 380 381
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

382
    colDataAppend(output->columnData, i, (char*) &value, false);
D
dapan1121 已提交
383 384 385
  }

_return:
D
dapan1121 已提交
386 387 388 389
  for (int32_t i = 0; i < node->pParameterList->length; ++i) {
    sclFreeParamNoData(params + i);
  }

D
dapan1121 已提交
390
  tfree(params);
D
dapan1121 已提交
391
  SCL_RET(code);
D
dapan1121 已提交
392 393 394 395 396 397 398 399
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
  
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
400 401 402
  output->columnData = createColumnInfoData(&node->node.resType, rowNum);
  if (output->columnData == NULL) {
    sclError("calloc failed, size:%d", (int32_t)rowNum * node->node.resType.bytes);
D
dapan1121 已提交
403 404 405 406 407
    SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

D
dapan1121 已提交
408
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
409 410
  SScalarParam* pLeft = &params[0];
  SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
D
dapan1121 已提交
411
  
D
dapan 已提交
412
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
D
dapan1121 已提交
413 414

_return:
D
dapan1121 已提交
415 416 417 418
  for (int32_t i = 0; i < paramNum; ++i) {
    sclFreeParamNoData(params + i);
  }

D
dapan1121 已提交
419
  tfree(params);
D
dapan1121 已提交
420
  SCL_RET(code);
D
dapan1121 已提交
421 422
}

D
dapan 已提交
423
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
424 425 426
  SFunctionNode *node = (SFunctionNode *)*pNode;
  SScalarParam output = {0};
  
D
dapan 已提交
427 428
  ctx->code = sclExecFuncion(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
429 430 431
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
432
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
433 434
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
435
    sclFreeParam(&output);
D
dapan 已提交
436
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
437 438 439 440 441
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;

442 443 444 445
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
446
  } else {
447
    memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
D
dapan1121 已提交
448 449
  }
  
D
dapan1121 已提交
450 451 452
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
453
  sclFreeParam(&output);
D
dapan1121 已提交
454 455 456 457

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
458
EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
459 460
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

461
  SScalarParam output = {.columnData = calloc(1, sizeof(SColumnInfoData))};
D
dapan 已提交
462 463
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
464 465 466
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
467
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
468 469
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
470
    sclFreeParam(&output);    
D
dapan 已提交
471
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
472 473 474 475 476
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;

477 478 479 480
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
481
  } else {
482
    memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
D
dapan1121 已提交
483
  }
D
dapan1121 已提交
484 485 486 487

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
488 489
  sclFreeParam(&output);

D
dapan1121 已提交
490 491 492
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
493
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
494
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
495

496
  SScalarParam output = {.columnData = calloc(1, sizeof(SColumnInfoData))};
D
dapan 已提交
497 498
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
499 500 501
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
502
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
503
  if (NULL == res) {
D
dapan1121 已提交
504 505
    sclError("make value node failed");    
    sclFreeParam(&output);    
D
dapan 已提交
506
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
507 508 509
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
510
  res->node.resType = node->node.resType;
D
dapan1121 已提交
511

512
  int32_t type = output.columnData->info.type;
513
  if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
514 515
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
516
  } else {
517
    memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
D
dapan1121 已提交
518
  }
D
dapan1121 已提交
519 520 521 522

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
523
  sclFreeParam(&output);    
D
dapan1121 已提交
524 525 526 527
  return DEAL_RES_CONTINUE;
}

EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
D
dapan1121 已提交
528
  if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode)) {
D
dapan1121 已提交
529 530 531
    return DEAL_RES_CONTINUE;
  }

D
dapan 已提交
532 533
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
534
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
535
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
536 537 538
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
539
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
540 541
  }

D
dapan1121 已提交
542
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
D
dapan 已提交
543
    return sclRewriteOperator(pNode, ctx);
D
dapan1121 已提交
544 545
  }  
  
D
dapan1121 已提交
546
  sclError("invalid node type for calculating constants, type:%d", nodeType(*pNode));
D
dapan 已提交
547
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
D
dapan1121 已提交
548
  return DEAL_RES_ERROR;
D
dapan1121 已提交
549 550
}

D
dapan 已提交
551
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
552
  SFunctionNode *node = (SFunctionNode *)pNode;
D
dapan1121 已提交
553 554 555 556 557
  SScalarParam output = {0};
  
  ctx->code = sclExecFuncion(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
558 559
  }

D
dapan1121 已提交
560
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
561 562
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
563 564
  }

D
dapan1121 已提交
565 566 567
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
568
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
569
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
D
dapan1121 已提交
570 571 572 573 574
  SScalarParam output = {0};
  
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
575 576
  }

D
dapan1121 已提交
577
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
578
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
579
    return DEAL_RES_ERROR;
D
dapan1121 已提交
580 581 582 583 584
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
585
EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
586
  SOperatorNode *node = (SOperatorNode *)pNode;
D
dapan1121 已提交
587
  SScalarParam output = {0};
D
dapan1121 已提交
588
  
D
dapan1121 已提交
589 590
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
591 592
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
593

D
dapan1121 已提交
594
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
595
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
596 597 598
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
599 600 601
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
602 603 604
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
  STargetNode *target = (STargetNode *)pNode;
  
D
dapan1121 已提交
605 606
  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
607 608 609 610
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
611
  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, target->dataBlockId);
D
dapan 已提交
612
  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
D
dapan1121 已提交
613
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan 已提交
614 615 616 617 618 619 620 621 622 623 624 625 626
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);
  
  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }
  
627 628 629 630 631 632 633
  for (int32_t i = 0; i < res->numOfRows; ++i) {
    if (colDataIsNull(res->columnData, res->numOfRows, i, NULL)) {
      colDataAppend(col, i, NULL, true);
    } else {
      char *p = colDataGetData(res->columnData, i);
      colDataAppend(col, i, p, false);
    }
D
dapan 已提交
634 635 636
  }

  sclFreeParam(res);
D
dapan1121 已提交
637
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
D
dapan 已提交
638 639 640
  return DEAL_RES_CONTINUE;
}

D
dapan1121 已提交
641
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
X
Xiaoyu Wang 已提交
642
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) {
D
dapan1121 已提交
643
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
644
  }
D
dapan 已提交
645 646

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
647
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
648
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
649
  }
D
dapan1121 已提交
650

D
dapan1121 已提交
651
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
652
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
653
  }
D
dapan1121 已提交
654

D
dapan1121 已提交
655
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
D
dapan 已提交
656
    return sclWalkOperator(pNode, ctx);
D
dapan1121 已提交
657
  }
D
dapan1121 已提交
658

D
dapan 已提交
659 660 661
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
662

D
dapan 已提交
663
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
664 665
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
666 667 668 669 670 671 672 673
}

int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
674 675 676 677 678 679
  SScalarCtx ctx = {0};
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
680
  
D
dapan 已提交
681 682
  nodesRewriteNodePostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
683 684
  *pRes = pNode;

D
dapan 已提交
685 686 687
_return:
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
688 689
}

D
dapan 已提交
690
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
D
dapan1121 已提交
691
  if (NULL == pNode || NULL == pBlockList) {
D
dapan1121 已提交
692 693 694 695
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
696
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList};
D
dapan1121 已提交
697 698 699 700 701 702
  
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
703
  
D
dapan1121 已提交
704
  nodesWalkNodePostOrder(pNode, sclCalcWalker, (void *)&ctx);
D
dapan 已提交
705
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
706

D
dapan1121 已提交
707 708 709 710 711 712 713
  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }
    
714 715
    colDataAssign(pDst->columnData, res->columnData, res->numOfRows);
    pDst->numOfRows = res->numOfRows;
D
dapan1121 已提交
716
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
D
dapan1121 已提交
717
  }
D
dapan1121 已提交
718

D
dapan 已提交
719
_return:
D
dapan1121 已提交
720
  //nodesDestroyNode(pNode);
D
dapan 已提交
721 722
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
723 724 725 726
}