scalar.c 36.9 KB
Newer Older
D
dapan1121 已提交
1 2
#include "function.h"
#include "functionMgt.h"
H
Haojun Liao 已提交
3 4
#include "nodes.h"
#include "querynodes.h"
D
dapan1121 已提交
5
#include "sclInt.h"
H
Haojun Liao 已提交
6 7 8
#include "sclvector.h"
#include "tcommon.h"
#include "tdatablock.h"
9
#include "scalar.h"
S
slzhou 已提交
10
#include "tudf.h"
D
fix bug  
dapan1121 已提交
11
#include "ttime.h"
D
dapan1121 已提交
12

D
dapan1121 已提交
13
int32_t scalarGetOperatorParamNum(EOperatorType type) {
G
Ganlin Zhao 已提交
14
  if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type
D
dapan1121 已提交
15 16
   || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type
   || OP_TYPE_MINUS == type) {
D
dapan1121 已提交
17 18 19 20 21 22
    return 1;
  }

  return 2;
}

D
dapan1121 已提交
23
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
D
dapan 已提交
24
  char *timeStr = valueNode->datum.p;
D
dapan1121 已提交
25 26 27
  int32_t code = convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
D
dapan 已提交
28 29
  }
  taosMemoryFree(timeStr);
D
dapan1121 已提交
30
  valueNode->typeData = valueNode->datum.i;
G
Ganlin Zhao 已提交
31

D
dapan 已提交
32 33
  valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
  valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
D
dapan1121 已提交
34 35

  return TSDB_CODE_SUCCESS;
D
dapan 已提交
36 37
}

H
Haojun Liao 已提交
38
int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
H
Haojun Liao 已提交
39
  SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
40 41
  if (pColumnData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
42
    return terrno;
43 44 45 46 47 48 49
  }

  pColumnData->info.type      = pType->type;
  pColumnData->info.bytes     = pType->bytes;
  pColumnData->info.scale     = pType->scale;
  pColumnData->info.precision = pType->precision;

50
  int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
51 52
  if (code != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
53
    taosMemoryFree(pColumnData);
H
Haojun Liao 已提交
54
    return terrno;
55
  }
H
Haojun Liao 已提交
56 57

  pParam->columnData = pColumnData;
D
dapan1121 已提交
58
  pParam->colAlloced = true;
H
Haojun Liao 已提交
59
  return TSDB_CODE_SUCCESS;
60 61
}

D
dapan1121 已提交
62
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out, int32_t* overflow) {
63
  SScalarParam in = {.numOfRows = 1};
H
Haojun Liao 已提交
64 65 66 67 68
  int32_t code = sclCreateColumnInfoData(&pValueNode->node.resType, 1, &in);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

69
  colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
70

71
  colInfoDataEnsureCapacity(out->columnData, 1);
D
dapan1121 已提交
72
  code = vectorConvertImpl(&in, out, overflow);
73 74 75
  sclFreeParam(&in);

  return code;
76 77
}

D
dapan1121 已提交
78 79 80 81 82 83 84
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
  SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
  if (NULL == pObj) {
    sclError("taosHashInit failed, size:%d", 256);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

G
Ganlin Zhao 已提交
85
  taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type));
D
dapan1121 已提交
86 87 88 89

  int32_t code = 0;
  SNodeListNode *nodeList = (SNodeListNode *)pNode;
  SListCell *cell = nodeList->pNodeList->pHead;
H
Haojun Liao 已提交
90
  SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
91

D
dapan1121 已提交
92 93
  int32_t len = 0;
  void *buf = NULL;
G
Ganlin Zhao 已提交
94

D
dapan1121 已提交
95 96
  for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) {
    SValueNode *valueNode = (SValueNode *)cell->pNode;
G
Ganlin Zhao 已提交
97

D
dapan1121 已提交
98
    if (valueNode->node.resType.type != type) {
99
      out.columnData->info.type = type;
D
dapan1121 已提交
100 101 102 103 104 105 106 107 108
      if (IS_VAR_DATA_TYPE(type)) {
        if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) {
          out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE;
        } else {
          out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE;
        }
      } else {
        out.columnData->info.bytes = tDataTypes[type].bytes;
      }
109

D
dapan1121 已提交
110 111
      int32_t overflow = 0;
      code = doConvertDataType(valueNode, &out, &overflow);
112
      if (code != TSDB_CODE_SUCCESS) {
113
//        sclError("convert data from %d to %d failed", in.type, out.type);
D
dapan1121 已提交
114 115 116
        SCL_ERR_JRET(code);
      }

D
dapan1121 已提交
117 118 119 120 121
      if (overflow) {
        cell = cell->pNext;
        continue;
      }

D
dapan1121 已提交
122
      if (IS_VAR_DATA_TYPE(type)) {
123
        buf = colDataGetVarData(out.columnData, 0);
D
dapan1121 已提交
124
        len = varDataTLen(buf);
D
dapan1121 已提交
125 126
      } else {
        len = tDataTypes[type].bytes;
127
        buf = out.columnData->pData;
D
dapan1121 已提交
128 129 130
      }
    } else {
      buf = nodesGetValueFromNode(valueNode);
D
dapan1121 已提交
131
      if (IS_VAR_DATA_TYPE(type)) {
132
        len = varDataTLen(buf);
D
dapan1121 已提交
133 134
      } else {
        len = valueNode->node.resType.bytes;
135
      }
D
dapan1121 已提交
136
    }
G
Ganlin Zhao 已提交
137

138
    if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
D
dapan1121 已提交
139
      sclError("taosHashPut to set failed");
D
dapan1121 已提交
140 141
      SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
142

D
dapan1121 已提交
143 144 145 146
    colDataDestroy(out.columnData);
    taosMemoryFreeClear(out.columnData);
    out.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));

D
dapan1121 已提交
147
    cell = cell->pNext;
D
dapan1121 已提交
148 149 150
  }

  *data = pObj;
D
dapan1121 已提交
151 152 153

  colDataDestroy(out.columnData);
  taosMemoryFreeClear(out.columnData);
D
dapan1121 已提交
154 155 156
  return TSDB_CODE_SUCCESS;

_return:
D
dapan1121 已提交
157 158 159

  colDataDestroy(out.columnData);
  taosMemoryFreeClear(out.columnData);
D
dapan1121 已提交
160 161 162 163
  taosHashCleanup(pObj);
  SCL_RET(code);
}

D
dapan1121 已提交
164 165 166 167 168 169 170
void sclFreeRes(SHashObj *res) {
  SScalarParam *p = NULL;
  void *pIter = taosHashIterate(res, NULL);
  while (pIter) {
    p = (SScalarParam *)pIter;

    if (p) {
D
dapan 已提交
171
      sclFreeParam(p);
D
dapan1121 已提交
172 173 174 175 176 177
    }
    pIter = taosHashIterate(res, pIter);
  }
  taosHashCleanup(res);
}

D
dapan1121 已提交
178
void sclFreeParam(SScalarParam *param) {
D
dapan1121 已提交
179 180 181 182 183
  if (!param->colAlloced) {
    return;
  }
  
  if (param->columnData != NULL) {
184
    colDataDestroy(param->columnData);
185
    taosMemoryFreeClear(param->columnData);
186 187 188 189
  }

  if (param->pHashFilter != NULL) {
    taosHashCleanup(param->pHashFilter);
D
dapan1121 已提交
190
    param->pHashFilter = NULL;
191
  }
D
dapan1121 已提交
192 193
}

D
dapan1121 已提交
194 195 196 197
int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
  if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) {
    return TSDB_CODE_SUCCESS;
  }
G
Ganlin Zhao 已提交
198

wafwerar's avatar
wafwerar 已提交
199
  *res = taosMemoryMalloc(pNode->node.resType.bytes);
D
dapan1121 已提交
200 201 202 203 204 205 206 207 208
  if (NULL == (*res)) {
    sclError("malloc %d failed", pNode->node.resType.bytes);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  memcpy(*res, nodesGetValueFromNode(pNode), pNode->node.resType.bytes);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221
void sclFreeParamList(SScalarParam *param, int32_t paramNum) {
  if (NULL == param) {
    return;
  }

  for (int32_t i = 0; i < paramNum; ++i) {
    SScalarParam* p = param + i;
    sclFreeParam(p);
  }

  taosMemoryFree(param);
}

D
dapan1121 已提交
222 223
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
  switch (nodeType(node)) {
D
dapan1121 已提交
224 225 226 227 228
    case QUERY_NODE_LEFT_VALUE: {
      SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, 0);
      param->numOfRows = pb->info.rows;
      break;
    }
D
dapan1121 已提交
229 230
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)node;
231

H
Haojun Liao 已提交
232
      ASSERT(param->columnData == NULL);
233
      param->numOfRows = 1;
H
Haojun Liao 已提交
234
      /*int32_t code = */sclCreateColumnInfoData(&valueNode->node.resType, 1, param);
wmmhello's avatar
wmmhello 已提交
235
      if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
236
        colDataAppendNULL(param->columnData, 0);
237 238
      } else {
        colDataAppend(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
D
dapan1121 已提交
239
      }
D
dapan1121 已提交
240 241
      break;
    }
D
dapan1121 已提交
242 243
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)node;
244 245
      if (LIST_LENGTH(nodeList->pNodeList) <= 0) {
        sclError("invalid length in nodeList, length:%d", LIST_LENGTH(nodeList->pNodeList));
D
dapan1121 已提交
246 247 248
        SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan1121 已提交
249 250 251 252
      int32_t type = vectorGetConvertType(ctx->type.selfType, ctx->type.peerType);
      if (type == 0) {
        type = nodeList->dataType.type;
      }
G
Ganlin Zhao 已提交
253

D
dapan1121 已提交
254 255
      SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type));
      param->hashValueType = type;
D
dapan1121 已提交
256
      param->colAlloced = true;
D
dapan 已提交
257
      if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
258
        taosHashCleanup(param->pHashFilter);
D
dapan1121 已提交
259
        param->pHashFilter = NULL;
D
dapan 已提交
260 261
        sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
        return TSDB_CODE_QRY_OUT_OF_MEMORY;
G
Ganlin Zhao 已提交
262
      }
D
dapan1121 已提交
263
      param->colAlloced = false;
D
dapan1121 已提交
264 265
      break;
    }
X
Xiaoyu Wang 已提交
266
    case QUERY_NODE_COLUMN: {
D
dapan 已提交
267 268
      if (NULL == ctx->pBlockList) {
        sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList);
D
dapan1121 已提交
269 270
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
G
Ganlin Zhao 已提交
271

X
Xiaoyu Wang 已提交
272
      SColumnNode *ref = (SColumnNode *)node;
273 274 275 276 277 278 279 280 281 282 283

      int32_t index = -1;
      for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
        SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
        if (pb->info.blockId == ref->dataBlockId) {
          index = i;
          break;
        }
      }

      if (index == -1) {
D
dapan1121 已提交
284
        sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
D
dapan 已提交
285 286 287
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

288
      SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
289
      if (NULL == block || ref->slotId >= taosArrayGetSize(block->pDataBlock)) {
D
dapan 已提交
290
        sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
D
dapan1121 已提交
291 292 293
        SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
      }

D
dapan 已提交
294
      SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
295 296 297
#if TAG_FILTER_DEBUG
      qDebug("tagfilter column info, slotId:%d, colId:%d, type:%d", ref->slotId, columnData->info.colId, columnData->info.type);
#endif
298 299
      param->numOfRows = block->info.rows;
      param->columnData = columnData;
D
dapan1121 已提交
300 301
      break;
    }
302 303 304
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
D
dapan1121 已提交
305 306 307 308 309 310
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(node), node);
        SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
      }
      *param = *res;
D
dapan1121 已提交
311
      param->colAlloced = false;
D
dapan1121 已提交
312 313
      break;
    }
314 315
    default:
      break;
D
dapan1121 已提交
316 317
  }

318 319 320
  if (param->numOfRows > *rowNum) {
    if ((1 != param->numOfRows) && (1 < *rowNum)) {
      sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows);
D
dapan1121 已提交
321 322
      SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
    }
G
Ganlin Zhao 已提交
323

324
    *rowNum = param->numOfRows;
D
dapan1121 已提交
325 326
  }

327
  param->param = ctx->param;
D
dapan1121 已提交
328 329 330
  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
331
int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) {
D
dapan1121 已提交
332
  int32_t code = 0;
D
dapan1121 已提交
333 334
  if (NULL == pParamList) {
    if (ctx->pBlockList) {
D
dapan1121 已提交
335
      SSDataBlock *pBlock = taosArrayGetP(ctx->pBlockList, 0);
D
dapan1121 已提交
336 337 338 339 340
      *rowNum = pBlock->info.rows;
    } else {
      *rowNum = 1;
    }

D
dapan 已提交
341
    *paramNum = 1;
D
dapan1121 已提交
342
  } else {
D
dapan 已提交
343
    *paramNum = pParamList->length;
D
dapan1121 已提交
344 345
  }

D
dapan 已提交
346
  SScalarParam *paramList = taosMemoryCalloc(*paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
347
  if (NULL == paramList) {
D
dapan 已提交
348
    sclError("calloc %d failed", (int32_t)((*paramNum) * sizeof(SScalarParam)));
D
dapan1121 已提交
349
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
350 351
  }

D
dapan1121 已提交
352 353 354 355
  if (pParamList) {
    SNode *tnode = NULL;
    int32_t i = 0;
    if (SCL_IS_CONST_CALC(ctx)) {
G
Ganlin Zhao 已提交
356
      WHERE_EACH (tnode, pParamList) {
D
dapan1121 已提交
357
        if (!SCL_IS_CONST_NODE(tnode)) {
D
dapan 已提交
358
          WHERE_NEXT;
D
dapan1121 已提交
359 360 361 362
        } else {
          SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
          ERASE_NODE(pParamList);
        }
G
Ganlin Zhao 已提交
363

D
dapan1121 已提交
364 365 366
        ++i;
      }
    } else {
G
Ganlin Zhao 已提交
367
      FOREACH(tnode, pParamList) {
D
dapan1121 已提交
368 369 370
        SCL_ERR_JRET(sclInitParam(tnode, &paramList[i], ctx, rowNum));
        ++i;
      }
D
dapan1121 已提交
371
    }
D
dapan1121 已提交
372 373 374
  } else {
    paramList[0].numOfRows = *rowNum;
  }
D
dapan1121 已提交
375

D
dapan1121 已提交
376
  if (0 == *rowNum) {
G
Ganlin Zhao 已提交
377
    taosMemoryFreeClear(paramList);
D
dapan1121 已提交
378
  }
D
dapan1121 已提交
379

D
dapan1121 已提交
380
  *pParams = paramList;
D
dapan1121 已提交
381
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
382

D
dapan1121 已提交
383
_return:
wafwerar's avatar
wafwerar 已提交
384
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
385 386 387
  SCL_RET(code);
}

D
dapan1121 已提交
388 389 390 391
int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) {
  if (NULL == pNode) {
    return -1;
  }
G
Ganlin Zhao 已提交
392

wafwerar's avatar
wafwerar 已提交
393
  switch ((int)nodeType(pNode)) {
D
dapan1121 已提交
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
    case QUERY_NODE_VALUE: {
      SValueNode *valueNode = (SValueNode *)pNode;
      return valueNode->node.resType.type;
    }
    case QUERY_NODE_NODE_LIST: {
      SNodeListNode *nodeList = (SNodeListNode *)pNode;
      return nodeList->dataType.type;
    }
    case QUERY_NODE_COLUMN: {
      SColumnNode *colNode = (SColumnNode *)pNode;
      return colNode->node.resType.type;
    }
    case QUERY_NODE_FUNCTION:
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
      SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &pNode, POINTER_BYTES);
      if (NULL == res) {
        sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode);
        return -1;
      }
      return res->columnData->info.type;
    }
  }

  return -1;
}


void sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) {
  ctx->type.opResType = node->node.resType.type;
  ctx->type.selfType = sclGetNodeType(node->pLeft, ctx);
  ctx->type.peerType = sclGetNodeType(node->pRight, ctx);
}

D
dapan1121 已提交
428 429
int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
  int32_t code = 0;
D
dapan1121 已提交
430
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
431 432 433 434
  if (NULL == node->pLeft || (paramNum == 2 && NULL == node->pRight)) {
    sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }
G
Ganlin Zhao 已提交
435

wafwerar's avatar
wafwerar 已提交
436
  SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam));
D
dapan1121 已提交
437 438
  if (NULL == paramList) {
    sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam)));
D
dapan1121 已提交
439 440 441
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
442 443
  sclSetOperatorValueType(node, ctx);

D
dapan1121 已提交
444
  SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
D
dapan1121 已提交
445
  if (paramNum > 1) {
D
dapan1121 已提交
446
    TSWAP(ctx->type.selfType, ctx->type.peerType);
D
dapan1121 已提交
447
    SCL_ERR_JRET(sclInitParam(node->pRight, &paramList[1], ctx, rowNum));
D
dapan1121 已提交
448 449
  }

D
dapan1121 已提交
450
  *pParams = paramList;
D
dapan1121 已提交
451
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
452 453

_return:
wafwerar's avatar
wafwerar 已提交
454
  taosMemoryFreeClear(paramList);
D
dapan1121 已提交
455
  SCL_RET(code);
D
dapan1121 已提交
456 457
}

458
int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) {
D
dapan1121 已提交
459 460
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
461
  int32_t paramNum = 0;
D
dapan1121 已提交
462
  int32_t code = 0;
D
dapan 已提交
463
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
464

D
dapan1121 已提交
465
  if (fmIsUserDefinedFunc(node->funcId)) {
466
    code = callUdfScalarFunc(node->functionName, params, paramNum, output);
467 468 469 470
    if (code != 0) {
      sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
      goto _return;
    }
D
dapan1121 已提交
471 472 473 474 475 476 477
  } else {
    SScalarFuncExecFuncs ffpSet = {0};
    code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
    if (code) {
      sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
G
Ganlin Zhao 已提交
478

H
Haojun Liao 已提交
479 480 481
    code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
    if (code != TSDB_CODE_SUCCESS) {
      SCL_ERR_JRET(code);
D
dapan1121 已提交
482 483 484 485 486 487 488
    }

    code = (*ffpSet.process)(params, paramNum, output);
    if (code) {
      sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code));
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
489 490 491 492
  }

_return:

D
dapan1121 已提交
493
  sclFreeParamList(params, paramNum);
D
dapan1121 已提交
494 495 496 497 498 499 500 501 502 503 504 505
  SCL_RET(code);
}

int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *output) {
  if (NULL == node->pParameterList || node->pParameterList->length <= 0) {
    sclError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  if (TSDB_DATA_TYPE_BOOL != node->node.resType.type) {
    sclError("invalid logic resType, type:%d", node->node.resType.type);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
D
dapan1121 已提交
506 507
  }

D
dapan1121 已提交
508 509 510 511 512 513 514
  if (LOGIC_COND_TYPE_NOT == node->condType && node->pParameterList->length > 1) {
    sclError("invalid NOT operation parameter number, paramNum:%d", node->pParameterList->length);
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  SScalarParam *params = NULL;
  int32_t rowNum = 0;
D
dapan 已提交
515
  int32_t paramNum = 0;
D
dapan1121 已提交
516
  int32_t code = 0;
D
dapan 已提交
517
  SCL_ERR_RET(sclInitParamList(&params, node->pParameterList, ctx, &paramNum, &rowNum));
D
dapan1121 已提交
518 519 520 521
  if (NULL == params) {
    output->numOfRows = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
522

523 524 525 526
  int32_t type = node->node.resType.type;
  output->numOfRows = rowNum;

  SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
H
Haojun Liao 已提交
527 528 529
  code = sclCreateColumnInfoData(&t, rowNum, output);
  if (code != TSDB_CODE_SUCCESS) {
    SCL_ERR_JRET(code);
D
dapan1121 已提交
530
  }
D
dapan1121 已提交
531

D
dapan1121 已提交
532
  bool value = false;
D
dapan 已提交
533
  bool complete = true;
D
dapan1121 已提交
534
  for (int32_t i = 0; i < rowNum; ++i) {
D
dapan 已提交
535 536
    complete = true;
    for (int32_t m = 0; m < paramNum; ++m) {
D
dapan1121 已提交
537
      if (NULL == params[m].columnData) {
D
dapan 已提交
538
        complete = false;
D
dapan1121 已提交
539 540
        continue;
      }
541 542 543
      char* p = colDataGetData(params[m].columnData, i);
      GET_TYPED_DATA(value, bool, params[m].columnData->info.type, p);

D
dapan1121 已提交
544
      if (LOGIC_COND_TYPE_AND == node->condType && (false == value)) {
D
dapan1121 已提交
545
        complete = true;
D
dapan1121 已提交
546 547
        break;
      } else if (LOGIC_COND_TYPE_OR == node->condType && value) {
D
dapan1121 已提交
548
        complete = true;
D
dapan1121 已提交
549 550 551 552 553 554
        break;
      } else if (LOGIC_COND_TYPE_NOT == node->condType) {
        value = !value;
      }
    }

D
dapan 已提交
555 556 557
    if (complete) {
      colDataAppend(output->columnData, i, (char*) &value, false);
    }
D
dapan1121 已提交
558 559
  }

D
dapan1121 已提交
560 561 562 563 564
  if (SCL_IS_CONST_CALC(ctx) && (false == complete)) {
    sclFreeParam(output);
    output->numOfRows = 0;
  }

D
dapan1121 已提交
565
_return:
D
dapan1121 已提交
566

D
dapan1121 已提交
567
  sclFreeParamList(params, paramNum);
D
dapan1121 已提交
568
  SCL_RET(code);
D
dapan1121 已提交
569 570 571 572 573 574
}

int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *output) {
  SScalarParam *params = NULL;
  int32_t rowNum = 0;
  int32_t code = 0;
575

wmmhello's avatar
wmmhello 已提交
576
  // json not support in in operator
H
Haojun Liao 已提交
577
  if (nodeType(node->pLeft) == QUERY_NODE_VALUE) {
wmmhello's avatar
wmmhello 已提交
578
    SValueNode *valueNode = (SValueNode *)node->pLeft;
H
Haojun Liao 已提交
579
    if (valueNode->node.resType.type == TSDB_DATA_TYPE_JSON && (node->opType == OP_TYPE_IN || node->opType == OP_TYPE_NOT_IN)) {
wmmhello's avatar
wmmhello 已提交
580 581 582 583
      SCL_RET(TSDB_CODE_QRY_JSON_IN_ERROR);
    }
  }

D
dapan1121 已提交
584
  SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
585
  if (output->columnData == NULL) {
H
Haojun Liao 已提交
586 587 588 589
    code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
    if (code != TSDB_CODE_SUCCESS) {
      SCL_ERR_JRET(code);
    }
D
dapan1121 已提交
590 591 592 593
  }

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);

D
dapan1121 已提交
594
  int32_t paramNum = scalarGetOperatorParamNum(node->opType);
D
dapan1121 已提交
595 596
  SScalarParam* pLeft = &params[0];
  SScalarParam* pRight = paramNum > 1 ? &params[1] : NULL;
597

wmmhello's avatar
wmmhello 已提交
598
  terrno = TSDB_CODE_SUCCESS;
D
dapan 已提交
599
  OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
wmmhello's avatar
wmmhello 已提交
600
  code = terrno;
D
dapan1121 已提交
601 602

_return:
D
dapan1121 已提交
603

D
dapan1121 已提交
604
  sclFreeParamList(params, paramNum);
D
dapan1121 已提交
605
  SCL_RET(code);
D
dapan1121 已提交
606 607
}

D
dapan1121 已提交
608
EDealRes sclRewriteNullInOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) {
D
dapan1121 已提交
609 610 611
  if (opType <= OP_TYPE_CALC_MAX) {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
G
Ganlin Zhao 已提交
612
      sclError("make value node failed");
D
dapan1121 已提交
613 614 615
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
G
Ganlin Zhao 已提交
616

D
dapan1121 已提交
617
    res->node.resType.type = TSDB_DATA_TYPE_NULL;
G
Ganlin Zhao 已提交
618

D
dapan1121 已提交
619 620 621 622 623
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  } else {
    SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
    if (NULL == res) {
G
Ganlin Zhao 已提交
624
      sclError("make value node failed");
D
dapan1121 已提交
625 626 627
      ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return DEAL_RES_ERROR;
    }
G
Ganlin Zhao 已提交
628

D
dapan1121 已提交
629
    res->node.resType.type = TSDB_DATA_TYPE_BOOL;
D
dapan1121 已提交
630
    res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
D
dapan1121 已提交
631
    res->datum.b = false;
G
Ganlin Zhao 已提交
632

D
dapan1121 已提交
633 634 635 636 637 638 639
    nodesDestroyNode(*pNode);
    *pNode = (SNode*)res;
  }

  return DEAL_RES_CONTINUE;
}

D
dapan1121 已提交
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
EDealRes sclAggFuncWalker(SNode* pNode, void* pContext) {
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
    *(bool*)pContext = fmIsAggFunc(pFunc->funcId);
    if (*(bool*)pContext) {
      return DEAL_RES_END;
    }
  }

  return DEAL_RES_CONTINUE;
}


bool sclContainsAggFuncNode(SNode* pNode) {
  bool aggFunc = false;
  nodesWalkExpr(pNode, sclAggFuncWalker, (void *)&aggFunc);
  return aggFunc;
}
D
dapan1121 已提交
658

D
dapan 已提交
659
EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
660
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
661
  int32_t code = 0;
D
dapan1121 已提交
662 663 664

  if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
    SValueNode *valueNode = (SValueNode *)node->pLeft;
G
Ganlin Zhao 已提交
665
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)
D
dapan1121 已提交
666 667
        && (!sclContainsAggFuncNode(node->pRight))) {
      return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
668
    }
D
dapan 已提交
669

G
Ganlin Zhao 已提交
670
    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight)
D
dapan 已提交
671
      && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
D
dapan1121 已提交
672 673 674 675 676
      code = sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode);
      if (code) {
        ctx->code = code;
        return DEAL_RES_ERROR;
      }
D
dapan 已提交
677
    }
D
dapan1121 已提交
678 679 680 681
  }

  if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
    SValueNode *valueNode = (SValueNode *)node->pRight;
D
dapan1121 已提交
682 683 684
    if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)
       && (!sclContainsAggFuncNode(node->pLeft))) {
      return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
685
    }
D
dapan 已提交
686

G
Ganlin Zhao 已提交
687
    if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft)
D
dapan 已提交
688
      && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) {
D
dapan1121 已提交
689 690 691 692 693
      code = sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode);
      if (code) {
        ctx->code = code;
        return DEAL_RES_ERROR;
      }
D
dapan 已提交
694
    }
D
dapan1121 已提交
695 696 697 698 699 700 701 702 703 704 705
  }

  if (node->pRight && (QUERY_NODE_NODE_LIST == nodeType(node->pRight))) {
    SNodeListNode *listNode = (SNodeListNode *)node->pRight;
    SNode* tnode = NULL;
    WHERE_EACH(tnode, listNode->pNodeList) {
      if (SCL_IS_NULL_VALUE_NODE(tnode)) {
        if (node->opType == OP_TYPE_IN) {
          ERASE_NODE(listNode->pNodeList);
          continue;
        } else { //OP_TYPE_NOT_IN
D
dapan1121 已提交
706
          return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
707 708 709 710 711 712 713
        }
      }

      WHERE_NEXT;
    }

    if (listNode->pNodeList->length <= 0) {
D
dapan1121 已提交
714
      return sclRewriteNullInOptr(pNode, ctx, node->opType);
D
dapan1121 已提交
715 716 717 718 719 720
    }
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
721
EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
722
  SFunctionNode *node = (SFunctionNode *)*pNode;
D
dapan1121 已提交
723
  SNode* tnode = NULL;
724
  if (!fmIsScalarFunc(node->funcId) && (!ctx->dual)) {
G
Ganlin Zhao 已提交
725 726
    return DEAL_RES_CONTINUE;
  }
727

D
dapan1121 已提交
728 729 730 731 732 733
  FOREACH(tnode, node->pParameterList) {
    if (!SCL_IS_CONST_NODE(tnode)) {
      return DEAL_RES_CONTINUE;
    }
  }

D
dapan1121 已提交
734
  SScalarParam output = {0};
735

736
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan 已提交
737
  if (ctx->code) {
D
dapan1121 已提交
738 739 740
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
741
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
742 743
  if (NULL == res) {
    sclError("make value node failed");
D
dapan1121 已提交
744
    sclFreeParam(&output);
D
dapan 已提交
745
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
746 747 748
    return DEAL_RES_ERROR;
  }

749 750
  res->translate = true;

751 752 753 754
  res->node.resType.type = output.columnData->info.type;
  res->node.resType.bytes = output.columnData->info.bytes;
  res->node.resType.scale = output.columnData->info.scale;
  res->node.resType.precision = output.columnData->info.precision;
755
  if (colDataIsNull_s(output.columnData, 0)) {
756
    res->isNull = true;
D
dapan1121 已提交
757
  } else {
758
    int32_t type = output.columnData->info.type;
wmmhello's avatar
wmmhello 已提交
759 760 761 762 763
    if (type == TSDB_DATA_TYPE_JSON){
      int32_t len = getJsonValueLen(output.columnData->pData);
      res->datum.p = taosMemoryCalloc(len, 1);
      memcpy(res->datum.p, output.columnData->pData, len);
    } else if (IS_VAR_DATA_TYPE(type)) {
764 765 766
      //res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
      res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1);
      res->node.resType.bytes = varDataTLen(output.columnData->pData);
767
      memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
768
    } else {
D
dapan1121 已提交
769
      nodesSetValueNodeValue(res, output.columnData->pData);
770
    }
D
dapan1121 已提交
771
  }
772

D
dapan1121 已提交
773 774 775
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
776
  sclFreeParam(&output);
D
dapan1121 已提交
777 778 779
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
780
EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
781 782
  SLogicConditionNode *node = (SLogicConditionNode *)*pNode;

H
Haojun Liao 已提交
783
  SScalarParam output = {0};
D
dapan 已提交
784 785
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
786 787 788
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
789 790 791 792
  if (0 == output.numOfRows) {
    return DEAL_RES_CONTINUE;
  }

D
dapan1121 已提交
793
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
794 795
  if (NULL == res) {
    sclError("make value node failed");
796
    sclFreeParam(&output);
D
dapan 已提交
797
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
798 799 800 801
    return DEAL_RES_ERROR;
  }

  res->node.resType = node->node.resType;
802
  res->translate = true;
D
dapan1121 已提交
803

804 805 806 807
  int32_t type = output.columnData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    res->datum.p = output.columnData->pData;
    output.columnData->pData = NULL;
D
dapan1121 已提交
808
  } else {
D
dapan1121 已提交
809
    nodesSetValueNodeValue(res, output.columnData->pData);
D
dapan1121 已提交
810
  }
D
dapan1121 已提交
811 812 813 814

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

D
dapan1121 已提交
815
  sclFreeParam(&output);
D
dapan1121 已提交
816 817 818
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
819
EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
820
  SOperatorNode *node = (SOperatorNode *)*pNode;
D
dapan1121 已提交
821

D
dapan1121 已提交
822
  if ((!SCL_IS_CONST_NODE(node->pLeft)) || (!SCL_IS_CONST_NODE(node->pRight))) {
D
dapan 已提交
823
    return sclRewriteNonConstOperator(pNode, ctx);
D
dapan1121 已提交
824 825
  }

H
Haojun Liao 已提交
826
  SScalarParam output = {0};
D
dapan 已提交
827 828
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
829 830 831
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
832
  SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
D
dapan1121 已提交
833
  if (NULL == res) {
834 835
    sclError("make value node failed");
    sclFreeParam(&output);
D
dapan 已提交
836
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
837 838 839
    return DEAL_RES_ERROR;
  }

840
  res->translate = true;
D
dapan1121 已提交
841

842
  res->node.resType = node->node.resType;
843
  if (colDataIsNull_s(output.columnData, 0)) {
844 845
    res->isNull = true;
    res->node.resType = node->node.resType;
846
  } else {
847 848 849 850 851
    int32_t type = output.columnData->info.type;
    if (IS_VAR_DATA_TYPE(type)) {  // todo refactor
      res->datum.p = output.columnData->pData;
      output.columnData->pData = NULL;
    } else {
G
Ganlin Zhao 已提交
852
      nodesSetValueNodeValue(res, output.columnData->pData);
853
    }
D
dapan1121 已提交
854
  }
D
dapan1121 已提交
855 856 857 858

  nodesDestroyNode(*pNode);
  *pNode = (SNode*)res;

H
Haojun Liao 已提交
859
  sclFreeParam(&output);
D
dapan1121 已提交
860 861 862 863
  return DEAL_RES_CONTINUE;
}

EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
D
dapan 已提交
864 865
  SScalarCtx *ctx = (SScalarCtx *)pContext;

D
dapan1121 已提交
866
  if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
D
dapan 已提交
867
    return sclRewriteFunction(pNode, ctx);
D
dapan1121 已提交
868 869 870
  }

  if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
D
dapan 已提交
871
    return sclRewriteLogic(pNode, ctx);
D
dapan1121 已提交
872 873
  }

D
dapan1121 已提交
874
  if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
D
dapan 已提交
875
    return sclRewriteOperator(pNode, ctx);
876
  }
877 878

  return DEAL_RES_CONTINUE;
D
dapan1121 已提交
879 880
}

D
dapan 已提交
881
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
882
  SFunctionNode *node = (SFunctionNode *)pNode;
D
dapan1121 已提交
883
  SScalarParam output = {0};
884

885
  ctx->code = sclExecFunction(node, ctx, &output);
D
dapan1121 已提交
886 887
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
888 889
  }

D
dapan1121 已提交
890
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
891 892
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return DEAL_RES_ERROR;
D
dapan1121 已提交
893 894
  }

D
dapan1121 已提交
895 896 897
  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
898
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
899
  SLogicConditionNode *node = (SLogicConditionNode *)pNode;
D
dapan1121 已提交
900
  SScalarParam output = {0};
901

D
dapan1121 已提交
902 903 904
  ctx->code = sclExecLogic(node, ctx, &output);
  if (ctx->code) {
    return DEAL_RES_ERROR;
D
dapan1121 已提交
905 906
  }

D
dapan1121 已提交
907
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
908
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
909
    return DEAL_RES_ERROR;
D
dapan1121 已提交
910 911 912 913 914
  }

  return DEAL_RES_CONTINUE;
}

D
dapan 已提交
915
EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
D
dapan1121 已提交
916
  SOperatorNode *node = (SOperatorNode *)pNode;
D
dapan1121 已提交
917
  SScalarParam output = {0};
G
Ganlin Zhao 已提交
918

D
dapan1121 已提交
919 920
  ctx->code = sclExecOperator(node, ctx, &output);
  if (ctx->code) {
D
dapan1121 已提交
921 922
    return DEAL_RES_ERROR;
  }
D
dapan1121 已提交
923

D
dapan1121 已提交
924
  if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
D
dapan1121 已提交
925
    ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
D
dapan1121 已提交
926 927 928
    return DEAL_RES_ERROR;
  }

D
dapan1121 已提交
929 930 931
  return DEAL_RES_CONTINUE;
}

932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973
EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) {
  STargetNode *target = (STargetNode *)pNode;

  if (target->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
    sclError("target tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  int32_t index = -1;
  for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
    SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
    if (pb->info.blockId == target->dataBlockId) {
      index = i;
      break;
    }
  }

  if (index == -1) {
    sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", target->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);

  if (target->slotId >= taosArrayGetSize(block->pDataBlock)) {
    sclError("target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d", target->dataBlockId, target->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
    ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
    return DEAL_RES_ERROR;
  }

  // if block->pDataBlock is not enough, there are problems if target->slotId bigger than the size of block->pDataBlock,
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, target->slotId);

  SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  if (NULL == res) {
    sclError("no valid res in hash, node:%p, type:%d", target->pExpr, nodeType(target->pExpr));
    ctx->code = TSDB_CODE_QRY_APP_ERROR;
    return DEAL_RES_ERROR;
  }

974
  colDataAssign(col, res->columnData, res->numOfRows, NULL);
975 976 977 978 979 980
  block->info.rows = res->numOfRows;

  sclFreeParam(res);
  taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
  return DEAL_RES_CONTINUE;
}
D
dapan 已提交
981

D
dapan1121 已提交
982
EDealRes sclCalcWalker(SNode* pNode, void* pContext) {
D
dapan1121 已提交
983
  if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)|| QUERY_NODE_LEFT_VALUE == nodeType(pNode)) {
D
dapan1121 已提交
984
    return DEAL_RES_CONTINUE;
D
dapan1121 已提交
985
  }
D
dapan 已提交
986 987

  SScalarCtx *ctx = (SScalarCtx *)pContext;
D
dapan1121 已提交
988
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
D
dapan 已提交
989
    return sclWalkFunction(pNode, ctx);
D
dapan1121 已提交
990
  }
D
dapan1121 已提交
991

D
dapan1121 已提交
992
  if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
D
dapan 已提交
993
    return sclWalkLogic(pNode, ctx);
D
dapan1121 已提交
994
  }
D
dapan1121 已提交
995

D
dapan1121 已提交
996
  if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
D
dapan 已提交
997
    return sclWalkOperator(pNode, ctx);
D
dapan1121 已提交
998
  }
D
dapan1121 已提交
999

1000 1001 1002
  if (QUERY_NODE_TARGET == nodeType(pNode)) {
    return sclWalkTarget(pNode, ctx);
  }
D
dapan1121 已提交
1003

D
dapan 已提交
1004
  sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode));
D
dapan1121 已提交
1005 1006
  ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
  return DEAL_RES_ERROR;
D
dapan1121 已提交
1007 1008
}

D
dapan1121 已提交
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
  SSDataBlock* pb = taosArrayGetP(pBlockList, 0);
  SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
  if (NULL == pLeft) {
    sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  pLeft->numOfRows = pb->info.rows;
  colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows);

  _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
  OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);

  taosMemoryFree(pLeft);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1028
int32_t sclCalcConstants(SNode *pNode, bool dual, SNode **pRes) {
D
dapan1121 已提交
1029 1030 1031 1032 1033
  if (NULL == pNode) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan 已提交
1034
  SScalarCtx ctx = {0};
D
dapan1121 已提交
1035
  ctx.dual = dual;
1036
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
D
dapan 已提交
1037
  if (NULL == ctx.pRes) {
1038
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
D
dapan 已提交
1039 1040
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
G
Ganlin Zhao 已提交
1041

X
Xiaoyu Wang 已提交
1042
  nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
D
dapan 已提交
1043
  SCL_ERR_JRET(ctx.code);
D
dapan1121 已提交
1044 1045
  *pRes = pNode;

D
dapan 已提交
1046
_return:
1047

D
dapan 已提交
1048 1049
  sclFreeRes(ctx.pRes);
  return code;
D
dapan1121 已提交
1050
}
D
dapan1121 已提交
1051

D
dapan1121 已提交
1052
static int32_t sclGetMinusOperatorResType(SOperatorNode* pOp) {
1053 1054 1055 1056 1057 1058 1059 1060
  if (!IS_MATHABLE_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }
  pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
  pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1061
static int32_t sclGetMathOperatorResType(SOperatorNode* pOp) {
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
  SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
  SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
  if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) ||
      (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) ||
      (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_INTEGER_TYPE(rdt.type)) ||
      (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_INTEGER_TYPE(ldt.type)) ||
      (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_BOOL == rdt.type) ||
      (TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type)) {
    pOp->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
    pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
  } else {
    pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
    pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
  }
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1083
static int32_t sclGetCompOperatorResType(SOperatorNode* pOp) {
1084
  SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
1085 1086 1087
  if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
    ((SExprNode*)(pOp->pRight))->resType = ldt;
  } else if (nodesIsRegularOp(pOp)) {
1088
    SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
1089 1090 1091
    if (!IS_VAR_DATA_TYPE(ldt.type) || QUERY_NODE_VALUE != nodeType(pOp->pRight) ||
        (!IS_STR_DATA_TYPE(rdt.type) && (rdt.type != TSDB_DATA_TYPE_NULL))) {
      return TSDB_CODE_TSC_INVALID_OPERATION;
1092 1093 1094 1095 1096 1097 1098
    }
  }
  pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
  pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1099
static int32_t sclGetJsonOperatorResType(SOperatorNode* pOp) {
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113
  SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
  SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
  if (TSDB_DATA_TYPE_JSON != ldt.type || !IS_STR_DATA_TYPE(rdt.type)) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }
  if (pOp->opType == OP_TYPE_JSON_GET_VALUE) {
    pOp->node.resType.type = TSDB_DATA_TYPE_JSON;
  } else if (pOp->opType == OP_TYPE_JSON_CONTAINS) {
    pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
  }
  pOp->node.resType.bytes = tDataTypes[pOp->node.resType.type].bytes;
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1114
static int32_t sclGetBitwiseOperatorResType(SOperatorNode* pOp) {
1115 1116 1117 1118 1119
  pOp->node.resType.type = TSDB_DATA_TYPE_BIGINT;
  pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142

int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
  return sclCalcConstants(pNode, false, pRes);
}

int32_t scalarCalculateConstantsFromDual(SNode *pNode, SNode **pRes) {
  return sclCalcConstants(pNode, true, pRes);
}

int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
  if (NULL == pNode || NULL == pBlockList) {
    SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
  }

  int32_t code = 0;
  SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst ? pDst->param : NULL};

  // TODO: OPT performance
  ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == ctx.pRes) {
    sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
    SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
1143

D
dapan1121 已提交
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
  nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
  SCL_ERR_JRET(ctx.code);

  if (pDst) {
    SScalarParam *res = (SScalarParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
    if (NULL == res) {
      sclError("no valid res in hash, node:%p, type:%d", pNode, nodeType(pNode));
      SCL_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
    }

    if (1 == res->numOfRows) {
      SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
    } else {
      colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
      colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
      pDst->numOfRows = res->numOfRows;
    }

    sclFreeParam(res);
    taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
  }

_return:
  //nodesDestroyNode(pNode);
  sclFreeRes(ctx.pRes);
  return code;
}

1172 1173 1174 1175 1176 1177 1178 1179
int32_t scalarGetOperatorResultType(SOperatorNode* pOp) {
  if (TSDB_DATA_TYPE_BLOB == ((SExprNode*)(pOp->pLeft))->resType.type ||
      (NULL != pOp->pRight && TSDB_DATA_TYPE_BLOB == ((SExprNode*)(pOp->pRight))->resType.type)) {
    return TSDB_CODE_TSC_INVALID_OPERATION;
  }

  switch (pOp->opType) {
    case OP_TYPE_ADD:
D
dapan1121 已提交
1180 1181 1182 1183
    case OP_TYPE_SUB:
    case OP_TYPE_MULTI:
    case OP_TYPE_DIV:
    case OP_TYPE_REM:
D
dapan1121 已提交
1184
      return sclGetMathOperatorResType(pOp);
D
dapan1121 已提交
1185
    case OP_TYPE_MINUS:
D
dapan1121 已提交
1186
      return sclGetMinusOperatorResType(pOp);
1187 1188 1189 1190 1191
    case OP_TYPE_ASSIGN:
      pOp->node.resType = ((SExprNode*)(pOp->pLeft))->resType;
      break;
    case OP_TYPE_BIT_AND:
    case OP_TYPE_BIT_OR:
D
dapan1121 已提交
1192
      return sclGetBitwiseOperatorResType(pOp);
D
dapan1121 已提交
1193 1194 1195 1196 1197 1198
    case OP_TYPE_GREATER_THAN:
    case OP_TYPE_GREATER_EQUAL:
    case OP_TYPE_LOWER_THAN:
    case OP_TYPE_LOWER_EQUAL:
    case OP_TYPE_EQUAL:
    case OP_TYPE_NOT_EQUAL:
1199 1200 1201 1202 1203 1204 1205 1206
    case OP_TYPE_IS_NULL:
    case OP_TYPE_IS_NOT_NULL:
    case OP_TYPE_IS_TRUE:
    case OP_TYPE_IS_FALSE:
    case OP_TYPE_IS_UNKNOWN:
    case OP_TYPE_IS_NOT_TRUE:
    case OP_TYPE_IS_NOT_FALSE:
    case OP_TYPE_IS_NOT_UNKNOWN:
D
dapan1121 已提交
1207 1208 1209 1210
    case OP_TYPE_LIKE:
    case OP_TYPE_NOT_LIKE:
    case OP_TYPE_MATCH:
    case OP_TYPE_NMATCH:
1211 1212
    case OP_TYPE_IN:
    case OP_TYPE_NOT_IN:
D
dapan1121 已提交
1213
      return sclGetCompOperatorResType(pOp);
D
dapan1121 已提交
1214
    case OP_TYPE_JSON_GET_VALUE:
1215
    case OP_TYPE_JSON_CONTAINS:
D
dapan1121 已提交
1216
      return sclGetJsonOperatorResType(pOp);
D
dapan1121 已提交
1217
    default:
1218
      break;
D
dapan1121 已提交
1219 1220
  }

1221 1222
  return TSDB_CODE_SUCCESS;
}