physicalPlan.c 33.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16 17
#if 0

18
#include "plannerInt.h"
X
Xiaoyu Wang 已提交
19
#include "exception.h"
20
#include "parser.h"
21

22 23 24
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

25 26 27 28 29 30 31
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

32 33 34 35 36 37
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
38

39
static void* validPointer(void* p) {
X
Xiaoyu Wang 已提交
40 41 42 43 44 45
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

46 47 48 49
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

50 51
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
52
    if (0 == strcmp(name, gOpName[i])) {
53 54 55 56 57
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
58

59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
const char* dsinkTypeToDsinkName(int32_t type) {
  switch (type) {
    case DSINK_Dispatch:
      return "Dispatch";
    case DSINK_Insert:
      return "Insert";
    default:
      break;
  }
  return "Unknown";
}

int32_t dsinkNameToDsinkType(const char* name) {
  if (0 == strcmp(name, "Dispatch")) {
    return DSINK_Dispatch;
  } else if (0 == strcmp(name, "Insert")) {
    return DSINK_Insert;
  }
  return DSINK_Unknown;
}

80 81 82 83 84 85 86 87 88 89
static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
  dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols);
  if (NULL == dst->pSchema) {
    return false;
  }
  memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols);
  dst->numOfCols = src->numOfCols;
  dst->resultRowSize = src->resultRowSize;
  dst->precision = src->precision;
  return true;
X
Xiaoyu Wang 已提交
90 91
}

92
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
93
  dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
94
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr);
95 96 97
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
98

99
  dataBlockSchema->resultRowSize = 0;
100 101 102 103
  for (int32_t i = 0; i < pPlanNode->numOfExpr; ++i) {
    SExprInfo* pExprInfo = taosArrayGetP(pPlanNode->pExpr, i);
    memcpy(&dataBlockSchema->pSchema[i], &pExprInfo->base.resSchema, sizeof(SSlotSchema));

104 105
    dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
  }
106

107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
123 124
}

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) {
  SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
  sink->info.type = type;
  sink->info.name = dsinkTypeToDsinkName(type);
  if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) {
    tfree(sink);
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return sink;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot);
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
dengyihao's avatar
dengyihao 已提交
140
  TSWAP(inserter->pData, pBlocks->pData, char*);
141 142 143 144 145 146 147 148
  return (SDataSink*)inserter;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot);
  return (SDataSink*)dispatcher;
}

149
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
150
  SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
151
  node->info.type = type;
152 153 154
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
155
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
156 157
  }
  return node;
158 159
}

160 161 162 163 164 165 166 167 168 169
static void cleanupPhyNode(SPhyNode* pPhyNode) {
  if (pPhyNode == NULL) {
    return;
  }

  dropOneLevelExprInfo(pPhyNode->pTargets);
  tfree(pPhyNode->targetSchema.pSchema);
  tfree(pPhyNode);
}

170
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
H
Haojun Liao 已提交
171 172 173 174 175 176 177 178
  SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size);

  STableMeta *pTableMeta = pTable->pMeta->pTableMeta;
  node->uid       = pTableMeta->uid;
  node->count     = 1;
  node->order     = TSDB_ORDER_ASC;
  node->tableType = pTableMeta->tableType;

179 180 181 182 183 184 185
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

186
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
187 188 189 190 191 192
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
H
Haojun Liao 已提交
193
  return MAIN_SCAN;
194 195
}

H
Haojun Liao 已提交
196 197 198 199
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pQueryTableInfo, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pQueryTableInfo, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pQueryTableInfo);
  node->window = pQueryTableInfo->window;
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
  // todo tag cond
  return (SPhyNode*)node;
}


static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
H
Haojun Liao 已提交
221
  int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
H
Haojun Liao 已提交
222
  return createUserTableScanNode(pPlanNode, pTable, type);
223 224 225
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
226
  SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
227 228
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
H
Haojun Liao 已提交
229 230

  subplan->type  = type;
231 232 233
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
H
Haojun Liao 已提交
234 235
    if (NULL == pCxt->pCurrentSubplan->pChildren) {
      pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
236
    }
H
Haojun Liao 已提交
237 238

    taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
239
    subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
240 241
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
H
Haojun Liao 已提交
242

243 244
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
245
    currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
246 247 248
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
249
  }
H
Haojun Liao 已提交
250

251
  taosArrayPush(currentLevel, &subplan);
252
  pCxt->pCurrentSubplan = subplan;
X
Xiaoyu Wang 已提交
253
  ++(pCxt->pDag->numOfSubplans);
254 255 256
  return subplan;
}

H
Haojun Liao 已提交
257 258 259
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
  pNodeAddr->epset  = vg->epset;
X
Xiaoyu Wang 已提交
260 261
}

262 263
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) {
  SVgroupsInfo* pVgroupList = pTableInfo->pMeta->vgroupList;
264
  for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) {
265
    STORE_CURRENT_SUBPLAN(pCxt);
266
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
D
dapan 已提交
267
    subplan->msgType   = TDMT_VND_QUERY;
H
Haojun Liao 已提交
268 269

    vgroupInfoToNodeAddr(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
270
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTableInfo);
271
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
272
    RECOVERY_CURRENT_SUBPLAN(pCxt);
273
  }
274
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
275 276
}

277 278 279
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
H
Haojun Liao 已提交
280
  node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource)));
281 282
  return (SPhyNode*)node;
}
283

284 285 286
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
287 288
}

H
Haojun Liao 已提交
289 290 291 292
// TODO: the SVgroupInfo index
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) {
  SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList;
  vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode);
H
Haojun Liao 已提交
293
  int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
H
Haojun Liao 已提交
294
  return createUserTableScanNode(pPlanNode, pTableInfo, type);
D
dapan 已提交
295 296
}

297
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
298
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
299 300
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
301
  }
302

D
dapan 已提交
303
  return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
304 305
}

306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode));
  SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo;
  node->aggAlgo = AGG_ALGO_PLAIN;
  node->aggSplit = AGG_SPLIT_FINAL;
  if (NULL != pGroupBy) {
    node->aggAlgo = AGG_ALGO_HASHED;
    node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo));
  }
  return (SPhyNode*)node;
}

static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  // if (needMultiNodeAgg(pPlanNode)) {

  // }
  return createSingleTableAgg(pCxt, pPlanNode);
}

325
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
326 327 328 329 330
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
H
Haojun Liao 已提交
331
    case QNODE_STREAMSCAN:
X
Xiaoyu Wang 已提交
332
    case QNODE_TABLESCAN:
333
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
334
      break;
335 336 337 338
    case QNODE_AGGREGATE:
    case QNODE_GROUPBY:
      node = createAggNode(pCxt, pPlanNode);
      break;
339
    case QNODE_MODIFY:
X
Xiaoyu Wang 已提交
340 341
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
342 343 344
    default:
      assert(false);
  }
H
Haojun Liao 已提交
345

X
Xiaoyu Wang 已提交
346
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
347
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
348 349
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
350
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
351 352 353
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
354
  }
H
Haojun Liao 已提交
355

X
Xiaoyu Wang 已提交
356
  return node;
357 358
}

H
Haojun Liao 已提交
359 360 361 362 363
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;

  size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
  for (int32_t i = 0; i < numOfVgroups; ++i) {
X
Xiaoyu Wang 已提交
364
    STORE_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
365
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
H
Haojun Liao 已提交
366 367
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);

H
Haojun Liao 已提交
368
    subplan->execNode.epset = blocks->vg.epset;
369
    subplan->pDataSink  = createDataInserter(pCxt, blocks, NULL);
370 371 372
    subplan->pNode      = NULL;
    subplan->type       = QUERY_TYPE_MODIFY;
    subplan->msgType    = pPayload->msgType;
373
    subplan->id.queryId = pCxt->pDag->queryId;
H
Haojun Liao 已提交
374

X
Xiaoyu Wang 已提交
375
    RECOVERY_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
376 377 378
  }
}

379
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
380
  if (QNODE_MODIFY == pRoot->info.type) {
H
Haojun Liao 已提交
381
    splitModificationOpSubPlan(pCxt, pRoot);
X
Xiaoyu Wang 已提交
382
  } else {
D
dapan 已提交
383
    SSubplan* subplan  = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
384
    ++(pCxt->nextId.templateId);
H
Haojun Liao 已提交
385 386 387

    subplan->msgType   = TDMT_VND_QUERY;
    subplan->pNode     = createPhyNode(pCxt, pRoot);
388
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);
X
Xiaoyu Wang 已提交
389
  }
390 391 392
  // todo deal subquery
}

393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
static void postCreateDag(SQueryPlanNode* pQueryNode, SQueryDag* pDag, SArray* pNodeList) {
  // The exchange operator is not necessary, in case of the stream scan.
  // Here we need to remove it from the DAG.
  if (pQueryNode->info.type == QNODE_STREAMSCAN) {
    SArray* pRootLevel = taosArrayGetP(pDag->pSubplans, 0);
    SSubplan *pSubplan = taosArrayGetP(pRootLevel, 0);

    if (pSubplan->pNode->info.type == OP_Exchange) {
      ASSERT(taosArrayGetSize(pRootLevel) == 1);

      taosArrayRemove(pDag->pSubplans, 0);
      // And then update the number of the subplans.
      pDag->numOfSubplans -= 1;
    }
  } else {
    // Traverse the dag again to acquire the execution node.
    if (pNodeList != NULL) {
      SArray** pSubLevel = taosArrayGetLast(pDag->pSubplans);
      size_t   num = taosArrayGetSize(*pSubLevel);
      for (int32_t j = 0; j < num; ++j) {
        SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
        taosArrayPush(pNodeList, &pPlan->execNode);
      }
    }
  }
}

420
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) {
X
Xiaoyu Wang 已提交
421 422 423
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
H
Haojun Liao 已提交
424
      .pDag     = validPointer(calloc(1, sizeof(SQueryDag))),
X
Xiaoyu Wang 已提交
425
      .pCurrentSubplan = NULL,
426 427
       //The unsigned Id starting from 1 would be better
      .nextId   = {.queryId = requestId, .subplanId = 1, .templateId = 1},
X
Xiaoyu Wang 已提交
428
    };
H
Haojun Liao 已提交
429

X
Xiaoyu Wang 已提交
430
    *pDag = context.pDag;
431 432
    context.pDag->queryId = requestId;

433
    context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
X
Xiaoyu Wang 已提交
434 435 436 437 438 439
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
440

441
  postCreateDag(pQueryNode, *pDag, pNodeList);
442
  return TSDB_CODE_SUCCESS;
443
}
X
Xiaoyu Wang 已提交
444

H
Haojun Liao 已提交
445
void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) {
446 447 448 449 450 451
  if (NULL == pNode) {
    return;
  }
  if (OP_Exchange == pNode->info.type) {
    SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
    if (templateId == pExchange->srcTemplateId) {
H
Haojun Liao 已提交
452
      taosArrayPush(pExchange->pSrcEndPoints, pSource);
453 454
    }
  }
H
Haojun Liao 已提交
455

456 457 458
  if (pNode->pChildren != NULL) {
    size_t size = taosArrayGetSize(pNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
459
      setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i));
460 461 462 463
    }
  }
}

H
Haojun Liao 已提交
464 465
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
  setExchangSourceNode(templateId, pSource, subplan->pNode);
X
Xiaoyu Wang 已提交
466
}
467 468 469 470 471 472

static void destroyDataSinkNode(SDataSink* pSinkNode) {
  if (pSinkNode == NULL) {
    return;
  }

X
Xiaoyu Wang 已提交
473
  if (queryNodeType(pSinkNode) == DSINK_Dispatch) {
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
    SDataDispatcher* pDdSink = (SDataDispatcher*)pSinkNode;
    tfree(pDdSink->sink.schema.pSchema);
  }

  tfree(pSinkNode);
}

void qDestroySubplan(SSubplan* pSubplan) {
  if (pSubplan == NULL) {
    return;
  }

  taosArrayDestroy(pSubplan->pChildren);
  taosArrayDestroy(pSubplan->pParents);
  destroyDataSinkNode(pSubplan->pDataSink);
  cleanupPhyNode(pSubplan->pNode);

  tfree(pSubplan);
}
X
Xiaoyu Wang 已提交
493 494

#endif
X
Xiaoyu Wang 已提交
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974

#include "plannerInt.h"

#include "functionMgt.h"

typedef struct SSubLogicPlan {
  SNode* pRoot; // SLogicNode
  bool haveSuperTable;
  bool haveSystemTable;
} SSubLogicPlan;

int32_t splitLogicPlan(SSubLogicPlan* pLogicPlan) {
  // todo
  return TSDB_CODE_SUCCESS;
}

typedef struct SSlotIndex {
  int16_t dataBlockId;
  int16_t slotId;
} SSlotIndex;

typedef struct SPhysiPlanContext {
  int32_t errCode;
  int16_t nextDataBlockId;
  SArray* pLocationHelper;
} SPhysiPlanContext;

static int32_t getSlotKey(SNode* pNode, char* pKey) {
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
    SColumnNode* pCol = (SColumnNode*)pNode;
    if ('\0' == pCol->tableAlias[0]) {
      return sprintf(pKey, "%s", pCol->colName);
    }
    return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
  }
  return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}

static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId) {
  SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
  CHECK_ALLOC(pSlot, NULL);
  pSlot->slotId = slotId;
  pSlot->dataType = ((SExprNode*)pNode)->resType;
  pSlot->reserve = false;
  pSlot->output = false;
  return (SNode*)pSlot;
}

static SNode* createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId) {
  STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
  if (NULL == pTarget) {
    return NULL;
  }
  pTarget->dataBlockId = dataBlockId;
  pTarget->slotId = slotId;
  pTarget->pExpr = pNode;
  return (SNode*)pTarget;
}

static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
  SHashObj* pHash = NULL;
  if (NULL == pDataBlockDesc->pSlots) {
    pDataBlockDesc->pSlots = nodesMakeList();
    CHECK_ALLOC(pDataBlockDesc->pSlots, TSDB_CODE_OUT_OF_MEMORY);

    pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
    CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY);
    if (NULL == taosArrayInsert(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId, &pHash)) {
      taosHashCleanup(pHash);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  } else {
    pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
  }
  
  SNode* pNode = NULL;
  int16_t slotId = taosHashGetSize(pHash);
  FOREACH(pNode, pList) {
    SNode* pSlot = createSlotDesc(pCxt, pNode, slotId);
    CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY);
    if (TSDB_CODE_SUCCESS != nodesListAppend(pDataBlockDesc->pSlots, (SNode*)pSlot)) {
      nodesDestroyNode(pSlot);
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    SSlotIndex index = { .dataBlockId = pDataBlockDesc->dataBlockId, .slotId = slotId };
    char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    int32_t len = getSlotKey(pNode, name);
    CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY);

    SNode* pTarget = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId);
    CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY);
    REPLACE_NODE(pTarget);
  
    ++slotId;
  }
  return TSDB_CODE_SUCCESS;
}

typedef struct SSetSlotIdCxt {
  int32_t errCode;
  SHashObj* pLeftHash;
  SHashObj* pRightHash;
} SSetSlotIdCxt;

static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
    char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    int32_t len = getSlotKey(pNode, name);
    SSlotIndex* pIndex = taosHashGet(pCxt->pLeftHash, name, len);
    if (NULL == pIndex) {
      pIndex = taosHashGet(pCxt->pRightHash, name, len);
    }
    // pIndex is definitely not NULL, otherwise it is a bug
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
    ((SColumnNode*)pNode)->slotId = pIndex->slotId;
    CHECK_ALLOC(pNode, DEAL_RES_ERROR);
    return DEAL_RES_IGNORE_CHILD;
  }
  return DEAL_RES_CONTINUE;
}

static SNode* setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode) {
  SNode* pRes = nodesCloneNode(pNode);
  CHECK_ALLOC(pRes, NULL);
  SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)) };
  nodesWalkNode(pRes, doSetSlotId, &cxt);
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyNode(pRes);
    return NULL;
  }
  return pRes;
}

static SNodeList* setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNodeList* pList) {
  SNodeList* pRes = nodesCloneList(pList);
  CHECK_ALLOC(pRes, NULL);
  SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)) };
  nodesWalkList(pRes, doSetSlotId, &cxt);
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyList(pRes);
    return NULL;
  }
  return pRes;
}

static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, ENodeType type) {
  SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
  if (NULL == pPhysiNode) {
    return NULL;
  }
  pPhysiNode->outputDataBlockDesc.dataBlockId = pCxt->nextDataBlockId++;
  pPhysiNode->outputDataBlockDesc.type = QUERY_NODE_DATABLOCK_DESC;
  return pPhysiNode;
}

static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
  if (NULL != pLogicNode->pConditions) {
    pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->outputDataBlockDesc.dataBlockId, -1, pLogicNode->pConditions);
    CHECK_ALLOC(pPhysiNode->pConditions, TSDB_CODE_OUT_OF_MEMORY);
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, SDataBlockDescNode* pDataBlockDesc) {
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
  char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
  SNode* pNode;
  FOREACH(pNode, pTargets) {
    int32_t len = getSlotKey(pNode, name);
    SSlotIndex* pIndex = taosHashGet(pHash, name, len);
    ((SSlotDescNode*)nodesListGetNode(pDataBlockDesc->pSlots, pIndex->slotId))->output = true;
  }
  
  return TSDB_CODE_SUCCESS;
}

static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode) {
  if (NULL != pScanLogicNode->pScanCols) {
    pScanPhysiNode->pScanCols = nodesCloneList(pScanLogicNode->pScanCols);
    CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
  }
  // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
  CHECK_CODE(addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);

  CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode), TSDB_CODE_OUT_OF_MEMORY);

  CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);

  pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
  pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
  pScanPhysiNode->order = TSDB_ORDER_ASC;
  pScanPhysiNode->count = 1;
  pScanPhysiNode->reverse = 0;

  return TSDB_CODE_SUCCESS;
}

static SPhysiNode* createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
  STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
  CHECK_ALLOC(pTagScan, NULL);
  CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan), (SPhysiNode*)pTagScan);
  return (SPhysiNode*)pTagScan;
}

static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
  STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
  CHECK_ALLOC(pTableScan, NULL);
  CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan);
  pTableScan->scanFlag = pScanLogicNode->scanFlag;
  pTableScan->scanRange = pScanLogicNode->scanRange;
  return (SPhysiNode*)pTableScan;
}

static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
  switch (pScanLogicNode->scanType) {
    case SCAN_TYPE_TAG:
      return createTagScanPhysiNode(pCxt, pScanLogicNode);
    case SCAN_TYPE_TABLE:
      return createTableScanPhysiNode(pCxt, pScanLogicNode);
    case SCAN_TYPE_STABLE:
    case SCAN_TYPE_STREAM:
      break;
    default:
      break;
  }
  return NULL;
}

static SNodeList* createJoinOutputCols(SPhysiPlanContext* pCxt, SDataBlockDescNode* pLeftDesc, SDataBlockDescNode* pRightDesc) {
  SNodeList* pCols = nodesMakeList();
  CHECK_ALLOC(pCols, NULL);
  SNode* pNode;
  FOREACH(pNode, pLeftDesc->pSlots) {
    SSlotDescNode* pSlot = (SSlotDescNode*)pNode;
    SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
    if (NULL == pCol) {
      goto error;
    }
    pCol->node.resType = pSlot->dataType;
    pCol->dataBlockId = pLeftDesc->dataBlockId;
    pCol->slotId = pSlot->slotId;
    pCol->colId = -1;
    if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) {
      goto error;
    }
  }
  FOREACH(pNode, pRightDesc->pSlots) {
    SSlotDescNode* pSlot = (SSlotDescNode*)pNode;
    SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
    if (NULL == pCol) {
      goto error;
    }
    pCol->node.resType = pSlot->dataType;
    pCol->dataBlockId = pRightDesc->dataBlockId;
    pCol->slotId = pSlot->slotId;
    pCol->colId = -1;
    if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) {
      goto error;
    }
  }
  return pCols;
error:
  nodesDestroyList(pCols);
  return NULL;
}

static SPhysiNode* createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode) {
  SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN);
  CHECK_ALLOC(pJoin, NULL);

  SDataBlockDescNode* pLeftDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc;
  SDataBlockDescNode* pRightDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 1))->outputDataBlockDesc;
  pJoin->pOnConditions = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions);
  CHECK_ALLOC(pJoin->pOnConditions, (SPhysiNode*)pJoin);

  pJoin->pTargets = createJoinOutputCols(pCxt, pLeftDesc, pRightDesc);
  CHECK_ALLOC(pJoin->pTargets, (SPhysiNode*)pJoin);
  CHECK_CODE(addDataBlockDesc(pCxt, pJoin->pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin);

  CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin), (SPhysiNode*)pJoin);

  CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin);

  return (SPhysiNode*)pJoin;
}

typedef struct SRewritePrecalcExprsCxt {
  int32_t errCode;
  int32_t planNodeId;
  int32_t rewriteId;
  SNodeList* pPrecalcExprs;
} SRewritePrecalcExprsCxt;

static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SNode* pExpr = nodesCloneNode(*pNode);
  CHECK_ALLOC(pExpr, DEAL_RES_ERROR);
  if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
  if (NULL == pCol) {
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
  pCol->node.resType = pRewrittenExpr->resType;
  if ('\0' != pRewrittenExpr->aliasName[0]) {
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  } else {
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId, pCxt->rewriteId);
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  }
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)pCol;
  return DEAL_RES_IGNORE_CHILD;
}

static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
  switch (nodeType(*pNode)) {
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
      return collectAndRewrite(pContext, pNode);
    }
    case QUERY_NODE_FUNCTION: {
      if (!fmIsAggFunc(((SFunctionNode*)(*pNode))->funcId)) {
        return collectAndRewrite(pContext, pNode);
      }
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs, SNodeList** pRewrittenList) {
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == *pPrecalcExprs) {
    *pPrecalcExprs = nodesMakeList();
    CHECK_ALLOC(*pPrecalcExprs, TSDB_CODE_OUT_OF_MEMORY);
  }
  if (NULL == *pRewrittenList) {
    *pRewrittenList = nodesMakeList();
    CHECK_ALLOC(*pRewrittenList, TSDB_CODE_OUT_OF_MEMORY);
  }
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
    SNode* pNew = NULL;
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
      pNew = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0));
    } else {
      pNew = nodesCloneNode(pNode);
    }
    CHECK_ALLOC(pNew, TSDB_CODE_OUT_OF_MEMORY);
    CHECK_CODE(nodesListAppend(*pRewrittenList, pNew), TSDB_CODE_OUT_OF_MEMORY);
  }
  SRewritePrecalcExprsCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs };
  nodesRewriteList(*pRewrittenList, doRewritePrecalcExprs, &cxt);
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs)) {
    nodesDestroyList(cxt.pPrecalcExprs);
    *pPrecalcExprs = NULL;
  }
  return cxt.errCode;
}

static SPhysiNode* createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode) {
  SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_AGG);
  CHECK_ALLOC(pAgg, NULL);

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pGroupKeys = NULL;
  SNodeList* pAggFuncs = NULL;
  CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys), (SPhysiNode*)pAgg);
  CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs), (SPhysiNode*)pAgg);

  SDataBlockDescNode* pChildTupe = &(((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc);
  // push down expression to outputDataBlockDesc of child node
  if (NULL != pPrecalcExprs) {
    pAgg->pExprs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs);
    CHECK_ALLOC(pAgg->pExprs, (SPhysiNode*)pAgg);
    CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pExprs, pChildTupe), (SPhysiNode*)pAgg);
  }

  if (NULL != pGroupKeys) {
    pAgg->pGroupKeys = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys);
    CHECK_ALLOC(pAgg->pGroupKeys, (SPhysiNode*)pAgg);
    CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pGroupKeys, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg);
  }

  if (NULL != pAggFuncs) {
    pAgg->pAggFuncs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs);
    CHECK_ALLOC(pAgg->pAggFuncs, (SPhysiNode*)pAgg);
    CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pAggFuncs, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg);
  }

  CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg), (SPhysiNode*)pAgg);

  CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg);

  return (SPhysiNode*)pAgg;
}

static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SProjectLogicNode* pProjectLogicNode) {
  SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
  CHECK_ALLOC(pProject, NULL);

  pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc.dataBlockId, -1, pProjectLogicNode->pProjections);
  CHECK_ALLOC(pProject->pProjections, (SPhysiNode*)pProject);
  CHECK_CODE(addDataBlockDesc(pCxt, pProject->pProjections, &pProject->node.outputDataBlockDesc), (SPhysiNode*)pProject);

  CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject), (SPhysiNode*)pProject);

  return (SPhysiNode*)pProject;
}

static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPlan) {
  SNodeList* pChildren = nodesMakeList();
  CHECK_ALLOC(pChildren, NULL);

  SNode* pLogicChild;
  FOREACH(pLogicChild, pLogicPlan->pChildren) {
    SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, (SLogicNode*)pLogicChild);
    if (TSDB_CODE_SUCCESS != nodesListAppend(pChildren, pChildPhyNode)) {
      pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
      nodesDestroyList(pChildren);
      return NULL;
    }
  }

  SPhysiNode* pPhyNode = NULL;
  switch (nodeType(pLogicPlan)) {
    case QUERY_NODE_LOGIC_PLAN_SCAN:
      pPhyNode = createScanPhysiNode(pCxt, (SScanLogicNode*)pLogicPlan);
      break;
    case QUERY_NODE_LOGIC_PLAN_JOIN:
      pPhyNode = createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicPlan);
      break;
    case QUERY_NODE_LOGIC_PLAN_AGG:
      pPhyNode = createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicPlan);
      break;
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
      pPhyNode = createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicPlan);
      break;
    default:
      break;
  }

  pPhyNode->pChildren = pChildren;
  SNode* pChild;
  FOREACH(pChild, pPhyNode->pChildren) {
    ((SPhysiNode*)pChild)->pParent = pPhyNode;
  }

  return pPhyNode;
}

int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
  SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextDataBlockId = 0, .pLocationHelper = taosArrayInit(32, POINTER_BYTES) };
  if (NULL == cxt.pLocationHelper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  *pPhyNode = createPhysiNode(&cxt, pLogicNode);
  return cxt.errCode;
}

int32_t buildPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan) {
  // split
  // scale out
  // maping
  // create
  return TSDB_CODE_SUCCESS;
}