planPhysiCreater.c 43.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "planInt.h"
X
Xiaoyu Wang 已提交
17 18 19

#include "functionMgt.h"

X
bugfix  
Xiaoyu Wang 已提交
20 21 22 23 24
typedef struct SSlotIdInfo {
  int16_t slotId;
  bool set;
} SSlotIdInfo;

X
Xiaoyu Wang 已提交
25 26
typedef struct SSlotIndex {
  int16_t dataBlockId;
X
bugfix  
Xiaoyu Wang 已提交
27
  SArray* pSlotIdsInfo; // duplicate name slot
X
Xiaoyu Wang 已提交
28 29 30
} SSlotIndex;

typedef struct SPhysiPlanContext {
X
Xiaoyu Wang 已提交
31
  SPlanContext* pPlanCxt;
X
Xiaoyu Wang 已提交
32 33 34
  int32_t errCode;
  int16_t nextDataBlockId;
  SArray* pLocationHelper;
35
  SArray* pExecNodeList;
X
Xiaoyu Wang 已提交
36 37
} SPhysiPlanContext;

X
Xiaoyu Wang 已提交
38
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
X
Xiaoyu Wang 已提交
39 40
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
    SColumnNode* pCol = (SColumnNode*)pNode;
X
Xiaoyu Wang 已提交
41 42 43
    if (NULL != pStmtName) {
      return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
    }
X
Xiaoyu Wang 已提交
44 45 46 47 48
    if ('\0' == pCol->tableAlias[0]) {
      return sprintf(pKey, "%s", pCol->colName);
    }
    return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
  }
X
Xiaoyu Wang 已提交
49

X
Xiaoyu Wang 已提交
50 51 52
  if (NULL != pStmtName) {
    return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName);
  }
X
Xiaoyu Wang 已提交
53 54 55
  return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}

X
bugfix  
Xiaoyu Wang 已提交
56
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId, bool output) {
X
Xiaoyu Wang 已提交
57
  SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
X
bugfix  
Xiaoyu Wang 已提交
58 59 60
  if (NULL == pSlot) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
61 62 63
  pSlot->slotId = slotId;
  pSlot->dataType = ((SExprNode*)pNode)->resType;
  pSlot->reserve = false;
X
bugfix  
Xiaoyu Wang 已提交
64
  pSlot->output = output;
X
Xiaoyu Wang 已提交
65 66 67
  return (SNode*)pSlot;
}

X
bugfix  
Xiaoyu Wang 已提交
68
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
X
Xiaoyu Wang 已提交
69 70
  STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
  if (NULL == pTarget) {
X
bugfix  
Xiaoyu Wang 已提交
71
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
72
  }
X
bugfix  
Xiaoyu Wang 已提交
73

X
Xiaoyu Wang 已提交
74 75 76
  pTarget->dataBlockId = dataBlockId;
  pTarget->slotId = slotId;
  pTarget->pExpr = pNode;
X
bugfix  
Xiaoyu Wang 已提交
77 78 79

  *pOutput = (SNode*)pTarget;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
80 81
}

X
bugfix  
Xiaoyu Wang 已提交
82
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
  if (NULL != pIndex) {
    SSlotIdInfo info = { .slotId = slotId, .set = false };
    taosArrayPush(pIndex->pSlotIdsInfo, &info);
    return TSDB_CODE_SUCCESS;
  }

  SSlotIndex index = { .dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo)) };
  if (NULL == index.pSlotIdsInfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  SSlotIdInfo info = { .slotId = slotId, .set = false };
  taosArrayPush(index.pSlotIdsInfo, &info);
X
bugfix  
Xiaoyu Wang 已提交
96 97 98 99 100
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}

static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
  char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
X
Xiaoyu Wang 已提交
101
  int32_t len = getSlotKey(pNode, NULL, name);
X
bugfix  
Xiaoyu Wang 已提交
102 103 104 105
  return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash);
}

static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId, SHashObj** pDescHash) {
X
bugfix  
Xiaoyu Wang 已提交
106
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
bugfix  
Xiaoyu Wang 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120
  if (NULL == pHash) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
    taosHashCleanup(pHash);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pDescHash = pHash;
  return TSDB_CODE_SUCCESS;
}

static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, SHashObj* pHash) {
  pDataBlockDesc->pSlots = nodesMakeList();
X
Xiaoyu Wang 已提交
121
  if (NULL == pDataBlockDesc->pSlots) {
X
bugfix  
Xiaoyu Wang 已提交
122 123
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
124

X
bugfix  
Xiaoyu Wang 已提交
125 126 127 128 129 130 131 132 133
  int32_t code = TSDB_CODE_SUCCESS;
  int16_t slotId = 0;
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
    code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId, true));
    if (TSDB_CODE_SUCCESS == code) {
      code = putSlotToHash(pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
    }
    if (TSDB_CODE_SUCCESS == code) {
134 135
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
X
bugfix  
Xiaoyu Wang 已提交
136 137 138
      ++slotId;
    } else {
      break;
X
Xiaoyu Wang 已提交
139
    }
X
bugfix  
Xiaoyu Wang 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
  }
  return code;
}

static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
  SDataBlockDescNode* pDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
  if (NULL == pDesc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pDesc->dataBlockId = pCxt->nextDataBlockId++;

  SHashObj* pHash = NULL;
  int32_t code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash);
  if (TSDB_CODE_SUCCESS == code) {
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pDataBlockDesc = pDesc;
X
Xiaoyu Wang 已提交
159
  } else {
X
bugfix  
Xiaoyu Wang 已提交
160
    nodesDestroyNode(pDesc);
X
Xiaoyu Wang 已提交
161
  }
X
bugfix  
Xiaoyu Wang 已提交
162 163 164 165

  return code;
}

X
bugfix  
Xiaoyu Wang 已提交
166 167 168 169 170 171 172 173 174 175 176 177
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
  for (int32_t i = 0; i < size; ++i) {
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
    if (!pInfo->set) {
      pInfo->set = true;
      return pInfo->slotId;
    }
  }
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
}

X
Xiaoyu Wang 已提交
178
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, const char* pStmtName, bool output) {
179 180 181 182
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

X
bugfix  
Xiaoyu Wang 已提交
183 184 185
  int32_t code = TSDB_CODE_SUCCESS;
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
  int16_t nextSlotId = taosHashGetSize(pHash), slotId = 0;
X
Xiaoyu Wang 已提交
186 187
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
188
    SNode* pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
X
bugfix  
Xiaoyu Wang 已提交
189
    char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
190
    int32_t len = getSlotKey(pExpr, pStmtName, name);
X
bugfix  
Xiaoyu Wang 已提交
191 192
    SSlotIndex* pIndex = taosHashGet(pHash, name, len);
    if (NULL == pIndex) {
193
      code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pExpr, nextSlotId, output));
X
bugfix  
Xiaoyu Wang 已提交
194 195 196
      if (TSDB_CODE_SUCCESS == code) {
        code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
      }
197 198 199 200
      pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
      if (output) {
        pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
      }
X
bugfix  
Xiaoyu Wang 已提交
201 202 203
      slotId = nextSlotId;
      ++nextSlotId;
    } else {
X
bugfix  
Xiaoyu Wang 已提交
204
      slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
X
bugfix  
Xiaoyu Wang 已提交
205
    }
206

X
bugfix  
Xiaoyu Wang 已提交
207 208 209 210 211 212 213 214 215 216 217
    if (TSDB_CODE_SUCCESS == code) {
      SNode* pTarget = NULL;
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pTarget);
      }
    }
  
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
218
  }
X
bugfix  
Xiaoyu Wang 已提交
219 220 221 222
  return code;
}

static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
223 224 225
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false);
}

226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
  if (NULL == pNode || NULL == *pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
  int32_t code = nodesListMakeAppend(&pList, *pNode);
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pNode = nodesListGetNode(pList, 0);
  }
  nodesClearList(pList);
  return code;
}

X
Xiaoyu Wang 已提交
243 244
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true);
X
bugfix  
Xiaoyu Wang 已提交
245 246 247
}

static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
248
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true);
X
Xiaoyu Wang 已提交
249 250 251 252 253 254 255 256 257 258 259 260
}

typedef struct SSetSlotIdCxt {
  int32_t errCode;
  SHashObj* pLeftHash;
  SHashObj* pRightHash;
} SSetSlotIdCxt;

static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
    char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
X
Xiaoyu Wang 已提交
261
    int32_t len = getSlotKey(pNode, NULL, name);
X
Xiaoyu Wang 已提交
262 263 264 265 266
    SSlotIndex* pIndex = taosHashGet(pCxt->pLeftHash, name, len);
    if (NULL == pIndex) {
      pIndex = taosHashGet(pCxt->pRightHash, name, len);
    }
    // pIndex is definitely not NULL, otherwise it is a bug
X
bugfix  
Xiaoyu Wang 已提交
267
    if (NULL == pIndex) {
268
      pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
X
bugfix  
Xiaoyu Wang 已提交
269 270
      return DEAL_RES_ERROR;
    }
X
Xiaoyu Wang 已提交
271
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
X
bugfix  
Xiaoyu Wang 已提交
272
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
X
Xiaoyu Wang 已提交
273 274 275 276 277
    return DEAL_RES_IGNORE_CHILD;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
278
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode, SNode** pOutput) {
X
Xiaoyu Wang 已提交
279
  SNode* pRes = nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
280 281 282 283 284 285 286 287 288
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
    .errCode = TSDB_CODE_SUCCESS,
    .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
    .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))
  };
X
Xiaoyu Wang 已提交
289
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
290 291
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyNode(pRes);
X
Xiaoyu Wang 已提交
292
    return cxt.errCode;
X
Xiaoyu Wang 已提交
293
  }
X
Xiaoyu Wang 已提交
294 295 296

  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
297 298
}

X
bugfix  
Xiaoyu Wang 已提交
299
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, const SNodeList* pList, SNodeList** pOutput) {
X
Xiaoyu Wang 已提交
300
  SNodeList* pRes = nodesCloneList(pList);
X
Xiaoyu Wang 已提交
301 302 303 304 305 306 307 308 309
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
    .errCode = TSDB_CODE_SUCCESS,
    .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
    .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))
  };
X
Xiaoyu Wang 已提交
310
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
311 312
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyList(pRes);
X
Xiaoyu Wang 已提交
313
    return cxt.errCode;
X
Xiaoyu Wang 已提交
314
  }
X
Xiaoyu Wang 已提交
315 316
  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
317 318
}

X
bugfix  
Xiaoyu Wang 已提交
319
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
X
Xiaoyu Wang 已提交
320
  SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
X
Xiaoyu Wang 已提交
321 322 323
  if (NULL == pPhysiNode) {
    return NULL;
  }
X
bugfix  
Xiaoyu Wang 已提交
324 325 326

  int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
  if (TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
327
    nodesDestroyNode(pPhysiNode);
X
Xiaoyu Wang 已提交
328 329 330 331 332 333 334
    return NULL;
  }
  return pPhysiNode;
}

static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
  if (NULL != pLogicNode->pConditions) {
X
Xiaoyu Wang 已提交
335
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions, &pPhysiNode->pConditions);
X
Xiaoyu Wang 已提交
336 337 338 339
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
}

static int32_t sortScanCols(SNodeList* pScanCols) {
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
  if (NULL == pArray) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNode* pCol = NULL;
  FOREACH(pCol, pScanCols) {
    taosArrayPush(pArray, &pCol);
  }
  taosArraySort(pArray, colIdCompare);

  int32_t index = 0;
  FOREACH(pCol, pScanCols) {
    REPLACE_NODE(taosArrayGetP(pArray, index++));
  }
  taosArrayDestroy(pArray);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
367
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
368 369 370
  pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
  if (NULL == pScanPhysiNode->pScanCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
371
  }
X
bugfix  
Xiaoyu Wang 已提交
372
  return sortScanCols(pScanPhysiNode->pScanCols);
373 374
}

X
Xiaoyu Wang 已提交
375 376 377 378
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
  if (TSDB_CODE_SUCCESS == code) {
    // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
X
bugfix  
Xiaoyu Wang 已提交
379
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
380 381 382 383 384 385 386 387 388 389 390 391
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
  }
  if (TSDB_CODE_SUCCESS == code) {
    pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
    pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
    pScanPhysiNode->order = TSDB_ORDER_ASC;
    pScanPhysiNode->count = 1;
    pScanPhysiNode->reverse = 0;
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
  }
X
Xiaoyu Wang 已提交
392

X
Xiaoyu Wang 已提交
393 394 395 396 397
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
  } else {
    nodesDestroyNode(pScanPhysiNode);
  }
X
Xiaoyu Wang 已提交
398

X
Xiaoyu Wang 已提交
399
  return code;
X
Xiaoyu Wang 已提交
400 401
}

402 403
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
404
  pNodeAddr->epSet  = vg->epSet;
405 406
}

X
Xiaoyu Wang 已提交
407
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
408
  STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
X
Xiaoyu Wang 已提交
409 410 411 412
  if (NULL == pTagScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
X
Xiaoyu Wang 已提交
413 414
}

X
Xiaoyu Wang 已提交
415
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
416
  STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
X
Xiaoyu Wang 已提交
417 418 419 420
  if (NULL == pTableScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
421 422
  pTableScan->scanFlag = pScanLogicNode->scanFlag;
  pTableScan->scanRange = pScanLogicNode->scanRange;
423
  pTableScan->ratio = pScanLogicNode->ratio;
424 425
  vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
  taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
D
dapan1121 已提交
426
  pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
D
dapan1121 已提交
427
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
X
Xiaoyu Wang 已提交
428 429

  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
X
Xiaoyu Wang 已提交
430 431
}

X
Xiaoyu Wang 已提交
432
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
433
  SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
X
Xiaoyu Wang 已提交
434 435 436 437
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
438 439
  pScan->showRewrite = pScanLogicNode->showRewrite;
  pScan->accountId = pCxt->pPlanCxt->acctId;
X
Xiaoyu Wang 已提交
440 441 442 443
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
    taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
  } else {
S
Shengliang Guan 已提交
444
    SQueryNodeAddr addr = { .nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet };
D
dapan1121 已提交
445
    taosArrayPush(pCxt->pExecNodeList, &addr);
X
Xiaoyu Wang 已提交
446
  }
X
Xiaoyu Wang 已提交
447
  pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
D
dapan1121 已提交
448
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
X
Xiaoyu Wang 已提交
449 450

  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
X
Xiaoyu Wang 已提交
451 452
}

X
Xiaoyu Wang 已提交
453
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
454
  SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
X
Xiaoyu Wang 已提交
455 456 457 458
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
X
Xiaoyu Wang 已提交
459 460
}

X
Xiaoyu Wang 已提交
461
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
462 463
  switch (pScanLogicNode->scanType) {
    case SCAN_TYPE_TAG:
X
Xiaoyu Wang 已提交
464
      return createTagScanPhysiNode(pCxt, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
465
    case SCAN_TYPE_TABLE:
X
Xiaoyu Wang 已提交
466
      return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
467
    case SCAN_TYPE_SYSTEM_TABLE:
X
Xiaoyu Wang 已提交
468
      return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
469
    case SCAN_TYPE_STREAM:
X
Xiaoyu Wang 已提交
470
      return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
471 472 473
    default:
      break;
  }
X
Xiaoyu Wang 已提交
474
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
475 476
}

X
Xiaoyu Wang 已提交
477
static int32_t createColFromDataBlockDesc(SDataBlockDescNode* pDesc, SNodeList* pCols) {
X
Xiaoyu Wang 已提交
478
  SNode* pNode;
X
Xiaoyu Wang 已提交
479
  FOREACH(pNode, pDesc->pSlots) {
X
Xiaoyu Wang 已提交
480 481 482
    SSlotDescNode* pSlot = (SSlotDescNode*)pNode;
    SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
    if (NULL == pCol) {
X
Xiaoyu Wang 已提交
483
      return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
484 485
    }
    pCol->node.resType = pSlot->dataType;
X
Xiaoyu Wang 已提交
486
    pCol->dataBlockId = pDesc->dataBlockId;
X
Xiaoyu Wang 已提交
487 488
    pCol->slotId = pSlot->slotId;
    pCol->colId = -1;
X
Xiaoyu Wang 已提交
489 490 491
    int32_t code = nodesListStrictAppend(pCols, pCol);
    if (TSDB_CODE_SUCCESS != code) {
      return code;
X
Xiaoyu Wang 已提交
492 493
    }
  }
X
Xiaoyu Wang 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
  return TSDB_CODE_SUCCESS;
}

static int32_t createJoinOutputCols(SPhysiPlanContext* pCxt, SDataBlockDescNode* pLeftDesc, SDataBlockDescNode* pRightDesc, SNodeList** pList) {
  SNodeList* pCols = nodesMakeList();
  if (NULL == pCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = createColFromDataBlockDesc(pLeftDesc, pCols);
  if (TSDB_CODE_SUCCESS == code) {
    code = createColFromDataBlockDesc(pRightDesc, pCols);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pList = pCols;
  } else {
    nodesDestroyList(pCols);
X
Xiaoyu Wang 已提交
512
  }
X
Xiaoyu Wang 已提交
513 514

  return code;
X
Xiaoyu Wang 已提交
515 516
}

X
Xiaoyu Wang 已提交
517
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
518
  SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_JOIN);
X
Xiaoyu Wang 已提交
519 520 521
  if (NULL == pJoin) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
522

X
Xiaoyu Wang 已提交
523 524
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
X
Xiaoyu Wang 已提交
525

X
Xiaoyu Wang 已提交
526 527 528 529 530
  int32_t code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
  if (TSDB_CODE_SUCCESS == code) {
    code = createJoinOutputCols(pCxt, pLeftDesc, pRightDesc, &pJoin->pTargets);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
531
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
532 533 534 535
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
  }
X
Xiaoyu Wang 已提交
536

X
Xiaoyu Wang 已提交
537 538 539 540 541
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pJoin;
  } else {
    nodesDestroyNode(pJoin);
  }
X
Xiaoyu Wang 已提交
542

X
Xiaoyu Wang 已提交
543
  return code;
X
Xiaoyu Wang 已提交
544 545 546 547 548 549 550 551 552 553 554
}

typedef struct SRewritePrecalcExprsCxt {
  int32_t errCode;
  int32_t planNodeId;
  int32_t rewriteId;
  SNodeList* pPrecalcExprs;
} SRewritePrecalcExprsCxt;

static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SNode* pExpr = nodesCloneNode(*pNode);
X
bugfix  
Xiaoyu Wang 已提交
555 556 557
  if (NULL == pExpr) {
    return DEAL_RES_ERROR;
  }
X
Xiaoyu Wang 已提交
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587
  if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
  if (NULL == pCol) {
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
  pCol->node.resType = pRewrittenExpr->resType;
  if ('\0' != pRewrittenExpr->aliasName[0]) {
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  } else {
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId, pCxt->rewriteId);
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  }
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)pCol;
  return DEAL_RES_IGNORE_CHILD;
}

static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
  switch (nodeType(*pNode)) {
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
      return collectAndRewrite(pContext, pNode);
    }
    case QUERY_NODE_FUNCTION: {
588
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
X
Xiaoyu Wang 已提交
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604
        return collectAndRewrite(pContext, pNode);
      }
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs, SNodeList** pRewrittenList) {
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == *pPrecalcExprs) {
    *pPrecalcExprs = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
605 606 607
    if (NULL == *pPrecalcExprs) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
608 609 610
  }
  if (NULL == *pRewrittenList) {
    *pRewrittenList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
611 612 613
    if (NULL == *pRewrittenList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
614 615 616 617 618 619 620 621 622
  }
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
    SNode* pNew = NULL;
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
      pNew = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0));
    } else {
      pNew = nodesCloneNode(pNode);
    }
X
bugfix  
Xiaoyu Wang 已提交
623 624 625 626 627 628
    if (NULL == pNew) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
629 630
  }
  SRewritePrecalcExprsCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs };
X
Xiaoyu Wang 已提交
631
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
X
Xiaoyu Wang 已提交
632 633 634 635 636 637 638
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs)) {
    nodesDestroyList(cxt.pPrecalcExprs);
    *pPrecalcExprs = NULL;
  }
  return cxt.errCode;
}

639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs, SNode** pRewritten) {
  if (NULL == pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
  int32_t code = nodesListMakeAppend(&pList, pNode);
  SNodeList* pRewrittenList = NULL;
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
  }
  nodesClearList(pList);
  nodesClearList(pRewrittenList);
  return code;
}

X
Xiaoyu Wang 已提交
658
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
659
  SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG);
X
Xiaoyu Wang 已提交
660 661 662
  if (NULL == pAgg) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
663 664 665 666

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pGroupKeys = NULL;
  SNodeList* pAggFuncs = NULL;
X
Xiaoyu Wang 已提交
667 668 669 670
  int32_t code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
  }
X
Xiaoyu Wang 已提交
671

X
Xiaoyu Wang 已提交
672 673
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
X
Xiaoyu Wang 已提交
674 675 676
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
677
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
678
    }
X
Xiaoyu Wang 已提交
679 680
  }

X
Xiaoyu Wang 已提交
681 682 683
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
684
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
685
    }
X
Xiaoyu Wang 已提交
686 687
  }

X
Xiaoyu Wang 已提交
688 689 690
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
691
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
692
    }
X
Xiaoyu Wang 已提交
693 694
  }

X
Xiaoyu Wang 已提交
695 696 697
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
  }
X
Xiaoyu Wang 已提交
698

X
Xiaoyu Wang 已提交
699 700 701 702 703
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pAgg;
  } else {
    nodesDestroyNode(pAgg);
  }
X
Xiaoyu Wang 已提交
704

X
bugfix  
Xiaoyu Wang 已提交
705 706 707 708
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pGroupKeys);
  nodesDestroyList(pAggFuncs);

X
Xiaoyu Wang 已提交
709
  return code;
X
Xiaoyu Wang 已提交
710 711
}

X
Xiaoyu Wang 已提交
712
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
713
  SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
X
Xiaoyu Wang 已提交
714 715 716
  if (NULL == pProject) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
717

718 719 720 721 722
  pProject->limit = pProjectLogicNode->limit;
  pProject->offset = pProjectLogicNode->offset;
  pProject->slimit = pProjectLogicNode->slimit;
  pProject->soffset = pProjectLogicNode->soffset;

X
Xiaoyu Wang 已提交
723 724
  int32_t code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1, pProjectLogicNode->pProjections, &pProject->pProjections);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
725
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections, pProject->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
726 727 728 729
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
  }
X
Xiaoyu Wang 已提交
730

X
Xiaoyu Wang 已提交
731 732 733 734 735
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pProject;
  } else {
    nodesDestroyNode(pProject);
  }
X
Xiaoyu Wang 已提交
736

X
Xiaoyu Wang 已提交
737
  return code;
X
Xiaoyu Wang 已提交
738 739
}

X
Xiaoyu Wang 已提交
740
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
741
  SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
742 743 744 745 746
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
    
  pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
X
bugfix  
Xiaoyu Wang 已提交
747
  *pPhyNode = (SPhysiNode*)pExchange;
X
Xiaoyu Wang 已提交
748

X
bugfix  
Xiaoyu Wang 已提交
749
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
750 751
}
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
752
  SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
X
Xiaoyu Wang 已提交
753 754 755 756
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  
X
bugfix  
Xiaoyu Wang 已提交
757 758 759 760 761 762
  int32_t code = TSDB_CODE_SUCCESS;

  pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
  if (NULL == pScan->pScanCols) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
L
fix  
Liu Jicong 已提交
763 764 765 766 767

  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }

768 769 770
  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }
X
Xiaoyu Wang 已提交
771
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
772
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
773 774 775 776 777 778 779 780 781 782 783 784 785 786
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScan;
  } else {
    nodesDestroyNode(pScan);
  }

  return code;
}

static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) {
  if (pCxt->pPlanCxt->streamQuery) {
    return createStreamScanPhysiNodeByExchange(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
787
  } else {
X
Xiaoyu Wang 已提交
788
    return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
789
  }
X
Xiaoyu Wang 已提交
790 791
}

X
Xiaoyu Wang 已提交
792 793 794 795 796 797 798 799 800 801
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWinodwPhysiNode* pWindow, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pFuncs = NULL;
  int32_t code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
802
      code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
803 804 805 806 807 808
    }
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
809
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
810 811 812 813 814 815 816 817 818 819 820 821 822
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pWindow;
  } else {
    nodesDestroyNode(pWindow);
  }

  return code;
}

static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
823
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERVAL);
X
Xiaoyu Wang 已提交
824 825 826
  if (NULL == pInterval) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
827 828 829 830

  pInterval->interval = pWindowLogicNode->interval;
  pInterval->offset = pWindowLogicNode->offset;
  pInterval->sliding = pWindowLogicNode->sliding;
H
Haojun Liao 已提交
831 832
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;
X
Xiaoyu Wang 已提交
833
  pInterval->precision = ((SColumnNode*)pWindowLogicNode->pTspk)->node.resType.precision;
H
Haojun Liao 已提交
834

X
Xiaoyu Wang 已提交
835
  pInterval->pFill = nodesCloneNode(pWindowLogicNode->pFill);
X
Xiaoyu Wang 已提交
836
  if (NULL != pWindowLogicNode->pFill && NULL == pInterval->pFill) {
X
Xiaoyu Wang 已提交
837 838
    nodesDestroyNode(pInterval);
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
839 840
  }

X
bugfix  
Xiaoyu Wang 已提交
841 842 843 844 845 846 847
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pInterval->pTspk);
  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyNode(pInterval);
    return code;
  }

X
Xiaoyu Wang 已提交
848 849 850 851
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
}

static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
X
bugfix  
Xiaoyu Wang 已提交
852
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW);
X
Xiaoyu Wang 已提交
853 854
  if (NULL == pSession) {
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
855 856
  }

X
Xiaoyu Wang 已提交
857
  pSession->gap = pWindowLogicNode->sessionGap;
X
Xiaoyu Wang 已提交
858

X
Xiaoyu Wang 已提交
859
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
860 861
}

862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW);
  if (NULL == pState) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNode* pStateKey = NULL;
  int32_t code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyNode(pState);
    return code;
  }

  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
896
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
897 898
  switch (pWindowLogicNode->winType) {
    case WINDOW_TYPE_INTERVAL:
X
Xiaoyu Wang 已提交
899
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
900
    case WINDOW_TYPE_SESSION:
X
Xiaoyu Wang 已提交
901
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
902
    case WINDOW_TYPE_STATE:
903
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
904 905 906
    default:
      break;
  }
X
Xiaoyu Wang 已提交
907
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
908 909
}

X
Xiaoyu Wang 已提交
910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode, SPhysiNode** pPhyNode) {
  SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pSortLogicNode, QUERY_NODE_PHYSICAL_PLAN_SORT);
  if (NULL == pSort) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pSortKeys = NULL;
  int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pSort->pSortKeys, pSort->node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pSort;
  } else {
    nodesDestroyNode(pSort);
  }

  return code;
}

945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
  SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
  if (NULL == pPart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pPartitionKeys = NULL;
  int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pPart->pPartitionKeys, pPart->node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pPart;
  } else {
    nodesDestroyNode(pPart);
  }

  return code;
}

X
Xiaoyu Wang 已提交
980 981
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan, SNodeList* pChildren, SPhysiNode** pPhyNode) {
  switch (nodeType(pLogicNode)) {
X
Xiaoyu Wang 已提交
982
    case QUERY_NODE_LOGIC_PLAN_SCAN:
X
Xiaoyu Wang 已提交
983
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
984
    case QUERY_NODE_LOGIC_PLAN_JOIN:
X
Xiaoyu Wang 已提交
985
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
986
    case QUERY_NODE_LOGIC_PLAN_AGG:
X
Xiaoyu Wang 已提交
987
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
988
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
X
Xiaoyu Wang 已提交
989
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
990
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
X
Xiaoyu Wang 已提交
991
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
992
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
X
Xiaoyu Wang 已提交
993
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
994 995
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
996 997
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
998 999 1000
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023

  return TSDB_CODE_FAILED;
}

static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan, SPhysiNode** pPhyNode) {
  SNodeList* pChildren = nodesMakeList();
  if (NULL == pChildren) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pLogicChild;
  FOREACH(pLogicChild, pLogicNode->pChildren) {
    SPhysiNode* pChild = NULL;
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
    if (TSDB_CODE_SUCCESS == code) {
      code = nodesListStrictAppend(pChildren, pChild);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
X
Xiaoyu Wang 已提交
1024
  }
X
Xiaoyu Wang 已提交
1025

X
Xiaoyu Wang 已提交
1026 1027 1028 1029 1030 1031 1032 1033
  if (TSDB_CODE_SUCCESS == code) {
    (*pPhyNode)->pChildren = pChildren;
    SNode* pChild;
    FOREACH(pChild, (*pPhyNode)->pChildren) {
      ((SPhysiNode*)pChild)->pParent = (*pPhyNode);
    }
  } else {
    nodesDestroyList(pChildren);
X
Xiaoyu Wang 已提交
1034 1035
  }

X
Xiaoyu Wang 已提交
1036
  return code;
X
Xiaoyu Wang 已提交
1037 1038
}

X
Xiaoyu Wang 已提交
1039
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
1040
  SDataInserterNode* pInserter = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
X
Xiaoyu Wang 已提交
1041 1042 1043 1044
  if (NULL == pInserter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1045 1046 1047
  pInserter->numOfTables = pBlocks->numOfTables;
  pInserter->size = pBlocks->size;
  TSWAP(pInserter->pData, pBlocks->pData, char*);
X
Xiaoyu Wang 已提交
1048 1049 1050

  *pSink = (SDataSinkNode*)pInserter;
  return TSDB_CODE_SUCCESS;
1051 1052
}

X
Xiaoyu Wang 已提交
1053
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
X
Xiaoyu Wang 已提交
1054
  SDataDispatcherNode* pDispatcher = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH);
X
Xiaoyu Wang 已提交
1055 1056 1057 1058
  if (NULL == pDispatcher) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1059
  pDispatcher->sink.pInputDataBlockDesc = nodesCloneNode(pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1060 1061 1062 1063 1064 1065 1066
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
    nodesDestroyNode(pDispatcher);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pSink = (SDataSinkNode*)pDispatcher;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1067 1068
}

X
Xiaoyu Wang 已提交
1069
static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
X
Xiaoyu Wang 已提交
1070
  SSubplan* pSubplan = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
X
Xiaoyu Wang 已提交
1071 1072 1073
  if (NULL == pSubplan) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1074
  pSubplan->id = pLogicSubplan->id;
X
Xiaoyu Wang 已提交
1075 1076
  pSubplan->subplanType = pLogicSubplan->subplanType;
  pSubplan->level = pLogicSubplan->level;
X
Xiaoyu Wang 已提交
1077 1078 1079
  return pSubplan;
}

X
Xiaoyu Wang 已提交
1080
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
X
Xiaoyu Wang 已提交
1081
  SSubplan* pSubplan = makeSubplan(pCxt, pLogicSubplan);
X
Xiaoyu Wang 已提交
1082 1083 1084 1085 1086 1087
  if (NULL == pSubplan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

1088 1089 1090
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
    SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode;
    pSubplan->msgType = pModif->msgType;
X
Xiaoyu Wang 已提交
1091
    pSubplan->execNode.epSet = pModif->pVgDataBlocks->vg.epSet;
1092
    taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
X
Xiaoyu Wang 已提交
1093
    code = createDataInserter(pCxt, pModif->pVgDataBlocks, &pSubplan->pDataSink);
1094
  } else {
1095
    pSubplan->msgType = TDMT_VND_QUERY;
X
Xiaoyu Wang 已提交
1096 1097 1098 1099
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
    }
1100
  }
X
Xiaoyu Wang 已提交
1101

X
Xiaoyu Wang 已提交
1102 1103 1104 1105
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiSubplan = pSubplan;
  } else {
    nodesDestroyNode(pSubplan);
X
Xiaoyu Wang 已提交
1106
  }
X
Xiaoyu Wang 已提交
1107 1108
  
  return code;
X
Xiaoyu Wang 已提交
1109 1110
}

X
Xiaoyu Wang 已提交
1111 1112 1113 1114 1115 1116 1117 1118 1119
static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
  SQueryPlan* pPlan = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
  if (NULL == pPlan) {
    return NULL;
  }
  pPlan->pSubplans = nodesMakeList();
  if (NULL == pPlan->pSubplans) {
    nodesDestroyNode(pPlan);
    return NULL;
1120
  }
X
Xiaoyu Wang 已提交
1121 1122
  pPlan->queryId = pCxt->pPlanCxt->queryId;
  return pPlan;
1123 1124 1125 1126 1127 1128
}

static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t level, SNodeList* pSubplans) {
  SNodeListNode* pGroup;
  if (level >= LIST_LENGTH(pSubplans)) {
    pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST);
X
bugfix  
Xiaoyu Wang 已提交
1129 1130 1131 1132 1133 1134
    if (NULL == pGroup) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, pGroup)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1135 1136 1137 1138 1139
  } else {
    pGroup = nodesListGetNode(pSubplans, level);
  }
  if (NULL == pGroup->pNodeList) {
    pGroup->pNodeList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
1140 1141 1142
    if (NULL == pGroup->pNodeList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1143
  }
X
bugfix  
Xiaoyu Wang 已提交
1144
  return nodesListStrictAppend(pGroup->pNodeList, pSubplan);
1145 1146
}

X
Xiaoyu Wang 已提交
1147 1148 1149
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) {
  SSubplan* pSubplan = NULL;
  int32_t code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
X
Xiaoyu Wang 已提交
1150

X
Xiaoyu Wang 已提交
1151 1152 1153
  if (TSDB_CODE_SUCCESS == code) {
    code = pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
    ++(pQueryPlan->numOfSubplans);
1154 1155
  }

X
Xiaoyu Wang 已提交
1156 1157 1158 1159
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
    code = nodesListMakeAppend(&pParent->pChildren, pSubplan);
    if (TSDB_CODE_SUCCESS == code) {
      code = nodesListMakeAppend(&pSubplan->pParents, pParent);
X
Xiaoyu Wang 已提交
1160 1161
    }
  }
X
Xiaoyu Wang 已提交
1162

X
Xiaoyu Wang 已提交
1163 1164 1165 1166 1167 1168
  if (TSDB_CODE_SUCCESS == code) {
    SNode* pChild = NULL;
    FOREACH(pChild, pLogicSubplan->pChildren) {
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
      if (TSDB_CODE_SUCCESS != code) {
        break;
X
Xiaoyu Wang 已提交
1169 1170 1171
      }
    }
  }
X
Xiaoyu Wang 已提交
1172 1173

  if (TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
1174
    nodesDestroyNode(pSubplan);
X
Xiaoyu Wang 已提交
1175
  }
X
Xiaoyu Wang 已提交
1176

X
Xiaoyu Wang 已提交
1177
  return code;
1178 1179
}

X
Xiaoyu Wang 已提交
1180 1181 1182 1183
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
  SQueryPlan* pPlan = makeQueryPhysiPlan(pCxt);
  if (NULL == pPlan) {
    return TSDB_CODE_OUT_OF_MEMORY;
1184
  }
X
Xiaoyu Wang 已提交
1185

X
Xiaoyu Wang 已提交
1186
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1187

X
Xiaoyu Wang 已提交
1188 1189 1190 1191 1192 1193
  SNode* pSubplan = NULL;
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
1194 1195
  }

X
Xiaoyu Wang 已提交
1196 1197 1198 1199
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiPlan = pPlan;
  } else {
    nodesDestroyNode(pPlan);
X
Xiaoyu Wang 已提交
1200 1201
  }

X
Xiaoyu Wang 已提交
1202
  return code;
X
Xiaoyu Wang 已提交
1203
}
X
Xiaoyu Wang 已提交
1204

X
Xiaoyu Wang 已提交
1205 1206
static void destoryLocationHash(void* p) {
  SHashObj* pHash = *(SHashObj**)p;
X
bugfix  
Xiaoyu Wang 已提交
1207 1208 1209 1210 1211
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
  while (NULL != pIndex) {
    taosArrayDestroy(pIndex->pSlotIdsInfo);
    pIndex = taosHashIterate(pHash, pIndex);
  }
X
Xiaoyu Wang 已提交
1212 1213 1214 1215 1216 1217 1218
  taosHashCleanup(pHash);
}

static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
}

1219 1220 1221 1222 1223 1224 1225 1226 1227 1228
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
  } else {
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
  }
}

X
Xiaoyu Wang 已提交
1229
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
X
Xiaoyu Wang 已提交
1230 1231 1232 1233
  SPhysiPlanContext cxt = {
    .pPlanCxt = pCxt,
    .errCode = TSDB_CODE_SUCCESS,
    .nextDataBlockId = 0,
1234 1235
    .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
    .pExecNodeList = pExecNodeList
X
Xiaoyu Wang 已提交
1236 1237 1238 1239
  };
  if (NULL == cxt.pLocationHelper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1240

X
Xiaoyu Wang 已提交
1241
  int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
1242 1243 1244 1245
  if (TSDB_CODE_SUCCESS == code) {
    setExplainInfo(pCxt, *pPlan);
  }

X
Xiaoyu Wang 已提交
1246 1247
  destoryPhysiPlanContext(&cxt);
  return code;
X
Xiaoyu Wang 已提交
1248
}