planPhysiCreater.c 46.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "planInt.h"
X
Xiaoyu Wang 已提交
17

X
Xiaoyu Wang 已提交
18
#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "functionMgt.h"
X
Xiaoyu Wang 已提交
20
#include "tglobal.h"
X
Xiaoyu Wang 已提交
21

X
bugfix  
Xiaoyu Wang 已提交
22 23
typedef struct SSlotIdInfo {
  int16_t slotId;
X
Xiaoyu Wang 已提交
24
  bool    set;
X
bugfix  
Xiaoyu Wang 已提交
25 26
} SSlotIdInfo;

X
Xiaoyu Wang 已提交
27 28
typedef struct SSlotIndex {
  int16_t dataBlockId;
X
Xiaoyu Wang 已提交
29
  SArray* pSlotIdsInfo;  // duplicate name slot
X
Xiaoyu Wang 已提交
30 31 32
} SSlotIndex;

typedef struct SPhysiPlanContext {
X
Xiaoyu Wang 已提交
33
  SPlanContext* pPlanCxt;
X
Xiaoyu Wang 已提交
34 35 36 37
  int32_t       errCode;
  int16_t       nextDataBlockId;
  SArray*       pLocationHelper;
  SArray*       pExecNodeList;
X
Xiaoyu Wang 已提交
38 39
} SPhysiPlanContext;

X
Xiaoyu Wang 已提交
40
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
X
Xiaoyu Wang 已提交
41 42
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
    SColumnNode* pCol = (SColumnNode*)pNode;
X
Xiaoyu Wang 已提交
43
    if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
44 45
      return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
    }
X
Xiaoyu Wang 已提交
46 47 48 49 50
    if ('\0' == pCol->tableAlias[0]) {
      return sprintf(pKey, "%s", pCol->colName);
    }
    return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
  }
X
Xiaoyu Wang 已提交
51

X
Xiaoyu Wang 已提交
52
  if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
53 54
    return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName);
  }
X
Xiaoyu Wang 已提交
55 56 57
  return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}

X
bugfix  
Xiaoyu Wang 已提交
58
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId, bool output) {
X
Xiaoyu Wang 已提交
59
  SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
X
bugfix  
Xiaoyu Wang 已提交
60 61 62
  if (NULL == pSlot) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
63 64 65
  pSlot->slotId = slotId;
  pSlot->dataType = ((SExprNode*)pNode)->resType;
  pSlot->reserve = false;
X
bugfix  
Xiaoyu Wang 已提交
66
  pSlot->output = output;
X
Xiaoyu Wang 已提交
67 68 69
  return (SNode*)pSlot;
}

X
bugfix  
Xiaoyu Wang 已提交
70
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
X
Xiaoyu Wang 已提交
71 72
  STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
  if (NULL == pTarget) {
X
bugfix  
Xiaoyu Wang 已提交
73
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
74
  }
X
bugfix  
Xiaoyu Wang 已提交
75

X
Xiaoyu Wang 已提交
76 77 78
  pTarget->dataBlockId = dataBlockId;
  pTarget->slotId = slotId;
  pTarget->pExpr = pNode;
X
bugfix  
Xiaoyu Wang 已提交
79 80 81

  *pOutput = (SNode*)pTarget;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
82 83
}

X
bugfix  
Xiaoyu Wang 已提交
84
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
85 86
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
  if (NULL != pIndex) {
X
Xiaoyu Wang 已提交
87
    SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
88 89 90 91
    taosArrayPush(pIndex->pSlotIdsInfo, &info);
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
92
  SSlotIndex index = {.dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo))};
X
bugfix  
Xiaoyu Wang 已提交
93 94 95
  if (NULL == index.pSlotIdsInfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
96
  SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
97
  taosArrayPush(index.pSlotIdsInfo, &info);
X
bugfix  
Xiaoyu Wang 已提交
98 99 100 101
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}

static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
X
Xiaoyu Wang 已提交
102
  char    name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
X
Xiaoyu Wang 已提交
103
  int32_t len = getSlotKey(pNode, NULL, name);
X
bugfix  
Xiaoyu Wang 已提交
104 105 106
  return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash);
}

X
Xiaoyu Wang 已提交
107 108
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
                                       SHashObj** pDescHash) {
X
bugfix  
Xiaoyu Wang 已提交
109
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
bugfix  
Xiaoyu Wang 已提交
110 111 112 113 114 115 116 117 118 119 120 121
  if (NULL == pHash) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
    taosHashCleanup(pHash);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pDescHash = pHash;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
122 123
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
                                   SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
124
  pDataBlockDesc->pSlots = nodesMakeList();
X
Xiaoyu Wang 已提交
125
  if (NULL == pDataBlockDesc->pSlots) {
X
bugfix  
Xiaoyu Wang 已提交
126 127
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
128

X
bugfix  
Xiaoyu Wang 已提交
129 130
  int32_t code = TSDB_CODE_SUCCESS;
  int16_t slotId = 0;
X
Xiaoyu Wang 已提交
131
  SNode*  pNode = NULL;
X
bugfix  
Xiaoyu Wang 已提交
132 133 134 135 136 137
  FOREACH(pNode, pList) {
    code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId, true));
    if (TSDB_CODE_SUCCESS == code) {
      code = putSlotToHash(pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
    }
    if (TSDB_CODE_SUCCESS == code) {
138 139
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
X
bugfix  
Xiaoyu Wang 已提交
140 141 142
      ++slotId;
    } else {
      break;
X
Xiaoyu Wang 已提交
143
    }
X
bugfix  
Xiaoyu Wang 已提交
144 145 146 147 148 149 150 151 152 153 154 155
  }
  return code;
}

static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
  SDataBlockDescNode* pDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
  if (NULL == pDesc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pDesc->dataBlockId = pCxt->nextDataBlockId++;

  SHashObj* pHash = NULL;
X
Xiaoyu Wang 已提交
156
  int32_t   code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash);
X
bugfix  
Xiaoyu Wang 已提交
157 158 159 160 161 162
  if (TSDB_CODE_SUCCESS == code) {
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pDataBlockDesc = pDesc;
X
Xiaoyu Wang 已提交
163
  } else {
X
bugfix  
Xiaoyu Wang 已提交
164
    nodesDestroyNode(pDesc);
X
Xiaoyu Wang 已提交
165
  }
X
bugfix  
Xiaoyu Wang 已提交
166 167 168 169

  return code;
}

X
bugfix  
Xiaoyu Wang 已提交
170 171 172 173 174 175 176 177 178 179 180 181
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
  for (int32_t i = 0; i < size; ++i) {
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
    if (!pInfo->set) {
      pInfo->set = true;
      return pInfo->slotId;
    }
  }
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
}

X
Xiaoyu Wang 已提交
182 183
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
                                     const char* pStmtName, bool output) {
184 185 186 187
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
188
  int32_t   code = TSDB_CODE_SUCCESS;
X
bugfix  
Xiaoyu Wang 已提交
189
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
X
Xiaoyu Wang 已提交
190 191
  int16_t   nextSlotId = taosHashGetSize(pHash), slotId = 0;
  SNode*    pNode = NULL;
X
Xiaoyu Wang 已提交
192
  FOREACH(pNode, pList) {
X
Xiaoyu Wang 已提交
193 194 195
    SNode*      pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
    char        name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    int32_t     len = getSlotKey(pExpr, pStmtName, name);
X
bugfix  
Xiaoyu Wang 已提交
196 197
    SSlotIndex* pIndex = taosHashGet(pHash, name, len);
    if (NULL == pIndex) {
198
      code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pExpr, nextSlotId, output));
X
bugfix  
Xiaoyu Wang 已提交
199 200 201
      if (TSDB_CODE_SUCCESS == code) {
        code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
      }
202 203 204 205
      pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
      if (output) {
        pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
      }
X
bugfix  
Xiaoyu Wang 已提交
206 207 208
      slotId = nextSlotId;
      ++nextSlotId;
    } else {
X
bugfix  
Xiaoyu Wang 已提交
209
      slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
X
bugfix  
Xiaoyu Wang 已提交
210
    }
211

X
bugfix  
Xiaoyu Wang 已提交
212 213 214 215 216 217 218
    if (TSDB_CODE_SUCCESS == code) {
      SNode* pTarget = NULL;
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pTarget);
      }
    }
X
Xiaoyu Wang 已提交
219

X
bugfix  
Xiaoyu Wang 已提交
220 221 222
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
223
  }
X
bugfix  
Xiaoyu Wang 已提交
224 225 226 227
  return code;
}

static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
228 229 230
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false);
}

231 232 233 234 235 236
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
  if (NULL == pNode || NULL == *pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
237
  int32_t    code = nodesListMakeAppend(&pList, *pNode);
238 239 240 241 242 243 244 245 246 247
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pNode = nodesListGetNode(pList, 0);
  }
  nodesClearList(pList);
  return code;
}

X
Xiaoyu Wang 已提交
248 249
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
                                           SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
250
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true);
X
bugfix  
Xiaoyu Wang 已提交
251 252 253
}

static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
254
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true);
X
Xiaoyu Wang 已提交
255 256 257
}

typedef struct SSetSlotIdCxt {
X
Xiaoyu Wang 已提交
258
  int32_t   errCode;
X
Xiaoyu Wang 已提交
259 260 261 262 263 264 265
  SHashObj* pLeftHash;
  SHashObj* pRightHash;
} SSetSlotIdCxt;

static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
X
Xiaoyu Wang 已提交
266 267 268
    char           name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    int32_t        len = getSlotKey(pNode, NULL, name);
    SSlotIndex*    pIndex = taosHashGet(pCxt->pLeftHash, name, len);
X
Xiaoyu Wang 已提交
269 270 271 272
    if (NULL == pIndex) {
      pIndex = taosHashGet(pCxt->pRightHash, name, len);
    }
    // pIndex is definitely not NULL, otherwise it is a bug
X
bugfix  
Xiaoyu Wang 已提交
273
    if (NULL == pIndex) {
274
      pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
X
bugfix  
Xiaoyu Wang 已提交
275 276
      return DEAL_RES_ERROR;
    }
X
Xiaoyu Wang 已提交
277
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
X
bugfix  
Xiaoyu Wang 已提交
278
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
X
Xiaoyu Wang 已提交
279 280 281 282 283
    return DEAL_RES_IGNORE_CHILD;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
284 285
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
                             SNode** pOutput) {
X
Xiaoyu Wang 已提交
286
  SNode* pRes = nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
287 288 289 290 291
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
292 293 294
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
295
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
296 297
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyNode(pRes);
X
Xiaoyu Wang 已提交
298
    return cxt.errCode;
X
Xiaoyu Wang 已提交
299
  }
X
Xiaoyu Wang 已提交
300 301 302

  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
303 304
}

X
Xiaoyu Wang 已提交
305 306
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
                             const SNodeList* pList, SNodeList** pOutput) {
X
Xiaoyu Wang 已提交
307
  SNodeList* pRes = nodesCloneList(pList);
X
Xiaoyu Wang 已提交
308 309 310 311 312
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
313 314 315
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
316
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
317 318
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyList(pRes);
X
Xiaoyu Wang 已提交
319
    return cxt.errCode;
X
Xiaoyu Wang 已提交
320
  }
X
Xiaoyu Wang 已提交
321 322
  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
323 324
}

325 326 327 328 329 330 331 332 333 334 335 336
static uint8_t getPrecision(SNodeList* pChildren) {
  if (1 == LIST_LENGTH(pChildren)) {
    return (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc)->precision;
  } else if (2 == LIST_LENGTH(pChildren)) {
    uint8_t lp = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc)->precision;
    uint8_t rp = (((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc)->precision;
    return (lp > rp ? rp : lp);
  }
  return 0;
}

static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, uint8_t precision, SLogicNode* pLogicNode, ENodeType type) {
X
Xiaoyu Wang 已提交
337
  SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
X
Xiaoyu Wang 已提交
338 339 340
  if (NULL == pPhysiNode) {
    return NULL;
  }
X
bugfix  
Xiaoyu Wang 已提交
341 342 343

  int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
  if (TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
344
    nodesDestroyNode(pPhysiNode);
X
Xiaoyu Wang 已提交
345 346
    return NULL;
  }
347
  pPhysiNode->pOutputDataBlockDesc->precision = precision;
X
Xiaoyu Wang 已提交
348 349 350 351 352
  return pPhysiNode;
}

static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
  if (NULL != pLogicNode->pConditions) {
X
Xiaoyu Wang 已提交
353 354
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions,
                         &pPhysiNode->pConditions);
X
Xiaoyu Wang 已提交
355 356 357 358
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
}

static int32_t sortScanCols(SNodeList* pScanCols) {
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
  if (NULL == pArray) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNode* pCol = NULL;
X
Xiaoyu Wang 已提交
372
  FOREACH(pCol, pScanCols) { taosArrayPush(pArray, &pCol); }
X
Xiaoyu Wang 已提交
373 374 375
  taosArraySort(pArray, colIdCompare);

  int32_t index = 0;
X
Xiaoyu Wang 已提交
376
  FOREACH(pCol, pScanCols) { REPLACE_NODE(taosArrayGetP(pArray, index++)); }
X
Xiaoyu Wang 已提交
377 378 379 380 381
  taosArrayDestroy(pArray);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
382
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
383 384 385 386
  if (NULL == pScanCols) {
    return TSDB_CODE_SUCCESS;
  }

387 388 389
  pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
  if (NULL == pScanPhysiNode->pScanCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
390
  }
X
bugfix  
Xiaoyu Wang 已提交
391
  return sortScanCols(pScanPhysiNode->pScanCols);
392 393
}

X
Xiaoyu Wang 已提交
394 395
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode,
                                           SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
396 397 398
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
  if (TSDB_CODE_SUCCESS == code) {
    // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
X
bugfix  
Xiaoyu Wang 已提交
399
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
400
  }
401 402 403 404 405 406 407 408 409 410 411 412

  if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
    pScanPhysiNode->pScanPseudoCols = nodesCloneList(pScanLogicNode->pScanPseudoCols);
    if (NULL == pScanPhysiNode->pScanPseudoCols) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
  }

X
Xiaoyu Wang 已提交
413 414 415
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
  }
416

X
Xiaoyu Wang 已提交
417 418 419 420 421
  if (TSDB_CODE_SUCCESS == code) {
    pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
    pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
  }
X
Xiaoyu Wang 已提交
422

X
Xiaoyu Wang 已提交
423 424 425 426 427
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
  } else {
    nodesDestroyNode(pScanPhysiNode);
  }
X
Xiaoyu Wang 已提交
428

X
Xiaoyu Wang 已提交
429
  return code;
X
Xiaoyu Wang 已提交
430 431
}

432 433
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
434
  pNodeAddr->epSet = vg->epSet;
435 436
}

X
Xiaoyu Wang 已提交
437
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
438 439
  STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(
      pCxt, pScanLogicNode->pMeta->tableInfo.precision, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
X
Xiaoyu Wang 已提交
440 441 442 443
  if (NULL == pTagScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
X
Xiaoyu Wang 已提交
444 445
}

X
Xiaoyu Wang 已提交
446 447 448 449 450
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                        SPhysiNode** pPhyNode) {
  STableScanPhysiNode* pTableScan =
      (STableScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision, (SLogicNode*)pScanLogicNode,
                                          QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
X
Xiaoyu Wang 已提交
451 452 453 454
  if (NULL == pTableScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
455
  memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
X
Xiaoyu Wang 已提交
456
  pTableScan->scanRange = pScanLogicNode->scanRange;
457
  pTableScan->ratio = pScanLogicNode->ratio;
458 459
  vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
  taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
D
dapan1121 已提交
460
  pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
D
dapan1121 已提交
461
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
462 463 464 465 466 467
  pTableScan->dataRequired = pScanLogicNode->dataRequired;
  pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
  if (NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) {
    nodesDestroyNode(pTableScan);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
468 469 470 471 472
  pTableScan->interval = pScanLogicNode->interval;
  pTableScan->offset = pScanLogicNode->offset;
  pTableScan->sliding = pScanLogicNode->sliding;
  pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
  pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
X
Xiaoyu Wang 已提交
473 474

  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
X
Xiaoyu Wang 已提交
475 476
}

X
Xiaoyu Wang 已提交
477 478 479 480 481
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                              SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
  SSystemTableScanPhysiNode* pScan =
      (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision,
                                                (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
X
Xiaoyu Wang 已提交
482 483 484 485
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
486 487
  pScan->showRewrite = pScanLogicNode->showRewrite;
  pScan->accountId = pCxt->pPlanCxt->acctId;
X
Xiaoyu Wang 已提交
488 489 490 491
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
    taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
  } else {
X
Xiaoyu Wang 已提交
492
    SQueryNodeAddr addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet};
D
dapan1121 已提交
493
    taosArrayPush(pCxt->pExecNodeList, &addr);
X
Xiaoyu Wang 已提交
494
  }
X
Xiaoyu Wang 已提交
495
  pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
D
dapan1121 已提交
496
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
X
Xiaoyu Wang 已提交
497 498

  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
X
Xiaoyu Wang 已提交
499 500
}

X
Xiaoyu Wang 已提交
501 502 503 504 505
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                         SPhysiNode** pPhyNode) {
  SStreamScanPhysiNode* pScan =
      (SStreamScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision,
                                           (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
X
Xiaoyu Wang 已提交
506 507 508 509
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
X
Xiaoyu Wang 已提交
510 511
}

X
Xiaoyu Wang 已提交
512 513
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
514 515
  switch (pScanLogicNode->scanType) {
    case SCAN_TYPE_TAG:
X
Xiaoyu Wang 已提交
516
      return createTagScanPhysiNode(pCxt, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
517
    case SCAN_TYPE_TABLE:
X
Xiaoyu Wang 已提交
518
      return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
519
    case SCAN_TYPE_SYSTEM_TABLE:
X
Xiaoyu Wang 已提交
520
      return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
521
    case SCAN_TYPE_STREAM:
X
Xiaoyu Wang 已提交
522
      return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
523 524 525
    default:
      break;
  }
X
Xiaoyu Wang 已提交
526
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
527 528
}

X
Xiaoyu Wang 已提交
529 530 531 532
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
                                   SPhysiNode** pPhyNode) {
  SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pJoinLogicNode,
                                                         QUERY_NODE_PHYSICAL_PLAN_JOIN);
X
Xiaoyu Wang 已提交
533 534 535
  if (NULL == pJoin) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
536

537 538
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
X
Xiaoyu Wang 已提交
539
  int32_t             code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
540

541 542
  pJoin->joinType = pJoinLogicNode->joinType;
  if (NULL != pJoinLogicNode->pOnConditions) {
X
Xiaoyu Wang 已提交
543 544
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions,
                         &pJoin->pOnConditions);
545
  }
X
Xiaoyu Wang 已提交
546
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
547 548
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
                         &pJoin->pTargets);
X
Xiaoyu Wang 已提交
549 550
  }
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
551
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
552 553 554 555
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
  }
X
Xiaoyu Wang 已提交
556

X
Xiaoyu Wang 已提交
557 558 559 560 561
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pJoin;
  } else {
    nodesDestroyNode(pJoin);
  }
X
Xiaoyu Wang 已提交
562

X
Xiaoyu Wang 已提交
563
  return code;
X
Xiaoyu Wang 已提交
564 565 566
}

typedef struct SRewritePrecalcExprsCxt {
X
Xiaoyu Wang 已提交
567 568 569
  int32_t    errCode;
  int32_t    planNodeId;
  int32_t    rewriteId;
X
Xiaoyu Wang 已提交
570 571 572 573 574
  SNodeList* pPrecalcExprs;
} SRewritePrecalcExprsCxt;

static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SNode* pExpr = nodesCloneNode(*pNode);
X
bugfix  
Xiaoyu Wang 已提交
575 576 577
  if (NULL == pExpr) {
    return DEAL_RES_ERROR;
  }
X
Xiaoyu Wang 已提交
578 579 580 581 582 583 584 585 586 587 588 589 590 591
  if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
  if (NULL == pCol) {
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
  pCol->node.resType = pRewrittenExpr->resType;
  if ('\0' != pRewrittenExpr->aliasName[0]) {
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  } else {
X
Xiaoyu Wang 已提交
592 593
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId,
             pCxt->rewriteId);
X
Xiaoyu Wang 已提交
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  }
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)pCol;
  return DEAL_RES_IGNORE_CHILD;
}

static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
  switch (nodeType(*pNode)) {
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
      return collectAndRewrite(pContext, pNode);
    }
    case QUERY_NODE_FUNCTION: {
609
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
X
Xiaoyu Wang 已提交
610 611 612 613 614 615 616 617 618
        return collectAndRewrite(pContext, pNode);
      }
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
619 620
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs,
                                   SNodeList** pRewrittenList) {
X
Xiaoyu Wang 已提交
621 622 623 624 625 626
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == *pPrecalcExprs) {
    *pPrecalcExprs = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
627 628 629
    if (NULL == *pPrecalcExprs) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
630 631 632
  }
  if (NULL == *pRewrittenList) {
    *pRewrittenList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
633 634 635
    if (NULL == *pRewrittenList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
636 637 638 639 640 641 642 643 644
  }
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
    SNode* pNew = NULL;
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
      pNew = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0));
    } else {
      pNew = nodesCloneNode(pNode);
    }
X
bugfix  
Xiaoyu Wang 已提交
645 646 647 648 649 650
    if (NULL == pNew) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
651
  }
X
Xiaoyu Wang 已提交
652
  SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs};
X
Xiaoyu Wang 已提交
653
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
X
Xiaoyu Wang 已提交
654 655 656 657 658 659 660
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs)) {
    nodesDestroyList(cxt.pPrecalcExprs);
    *pPrecalcExprs = NULL;
  }
  return cxt.errCode;
}

X
Xiaoyu Wang 已提交
661 662
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs,
                                  SNode** pRewritten) {
663 664 665 666 667
  if (NULL == pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
668
  int32_t    code = nodesListMakeAppend(&pList, pNode);
669 670 671 672 673 674 675 676 677 678 679 680
  SNodeList* pRewrittenList = NULL;
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
  }
  nodesClearList(pList);
  nodesClearList(pRewrittenList);
  return code;
}

X
Xiaoyu Wang 已提交
681 682 683 684
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
                                  SPhysiNode** pPhyNode) {
  SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pAggLogicNode,
                                                      QUERY_NODE_PHYSICAL_PLAN_AGG);
X
Xiaoyu Wang 已提交
685 686 687
  if (NULL == pAgg) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
688 689 690 691

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pGroupKeys = NULL;
  SNodeList* pAggFuncs = NULL;
X
Xiaoyu Wang 已提交
692
  int32_t    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
X
Xiaoyu Wang 已提交
693 694 695
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
  }
X
Xiaoyu Wang 已提交
696

X
Xiaoyu Wang 已提交
697 698
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
X
Xiaoyu Wang 已提交
699 700 701
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
702
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
703
    }
X
Xiaoyu Wang 已提交
704 705
  }

X
Xiaoyu Wang 已提交
706 707 708
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
709
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
710
    }
X
Xiaoyu Wang 已提交
711 712
  }

X
Xiaoyu Wang 已提交
713 714 715
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
716
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
717
    }
X
Xiaoyu Wang 已提交
718 719
  }

X
Xiaoyu Wang 已提交
720 721 722
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
  }
X
Xiaoyu Wang 已提交
723

X
Xiaoyu Wang 已提交
724 725 726 727 728
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pAgg;
  } else {
    nodesDestroyNode(pAgg);
  }
X
Xiaoyu Wang 已提交
729

X
bugfix  
Xiaoyu Wang 已提交
730 731 732 733
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pGroupKeys);
  nodesDestroyList(pAggFuncs);

X
Xiaoyu Wang 已提交
734
  return code;
X
Xiaoyu Wang 已提交
735 736
}

X
Xiaoyu Wang 已提交
737 738 739 740
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                      SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
  SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(
      pCxt, getPrecision(pChildren), (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
X
Xiaoyu Wang 已提交
741 742 743
  if (NULL == pProject) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
744

745 746 747 748 749
  pProject->limit = pProjectLogicNode->limit;
  pProject->offset = pProjectLogicNode->offset;
  pProject->slimit = pProjectLogicNode->slimit;
  pProject->soffset = pProjectLogicNode->soffset;

X
Xiaoyu Wang 已提交
750 751
  int32_t code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId,
                               -1, pProjectLogicNode->pProjections, &pProject->pProjections);
X
Xiaoyu Wang 已提交
752
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
753 754
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections,
                                       pProject->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
755 756 757 758
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
  }
X
Xiaoyu Wang 已提交
759

X
Xiaoyu Wang 已提交
760 761 762 763 764
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pProject;
  } else {
    nodesDestroyNode(pProject);
  }
X
Xiaoyu Wang 已提交
765

X
Xiaoyu Wang 已提交
766
  return code;
X
Xiaoyu Wang 已提交
767 768
}

X
Xiaoyu Wang 已提交
769 770 771 772
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                         SPhysiNode** pPhyNode) {
  SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(
      pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
773 774 775
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
776

X
Xiaoyu Wang 已提交
777
  pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
X
bugfix  
Xiaoyu Wang 已提交
778
  *pPhyNode = (SPhysiNode*)pExchange;
X
Xiaoyu Wang 已提交
779

X
bugfix  
Xiaoyu Wang 已提交
780
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
781
}
X
Xiaoyu Wang 已提交
782 783 784 785
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                                   SPhysiNode** pPhyNode) {
  SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(
      pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
X
Xiaoyu Wang 已提交
786 787 788
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
789

X
bugfix  
Xiaoyu Wang 已提交
790 791 792 793 794 795
  int32_t code = TSDB_CODE_SUCCESS;

  pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
  if (NULL == pScan->pScanCols) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
L
fix  
Liu Jicong 已提交
796 797 798 799 800

  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }

801 802 803
  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }
X
Xiaoyu Wang 已提交
804
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
805
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
806 807 808 809 810 811 812 813 814 815 816
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScan;
  } else {
    nodesDestroyNode(pScan);
  }

  return code;
}

X
Xiaoyu Wang 已提交
817 818
static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                       SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
819 820
  if (pCxt->pPlanCxt->streamQuery) {
    return createStreamScanPhysiNodeByExchange(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
821
  } else {
X
Xiaoyu Wang 已提交
822
    return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
823
  }
X
Xiaoyu Wang 已提交
824 825
}

X
Xiaoyu Wang 已提交
826 827
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWinodwPhysiNode* pWindow,
                                             SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
828 829
  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pFuncs = NULL;
X
Xiaoyu Wang 已提交
830
  int32_t    code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
X
Xiaoyu Wang 已提交
831 832 833 834 835 836

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
837
      code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
838 839 840
    }
  }

841 842 843 844
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
  }

X
Xiaoyu Wang 已提交
845 846 847
  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
848
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
849 850 851
    }
  }

X
Xiaoyu Wang 已提交
852 853 854
  pWindow->triggerType = pWindowLogicNode->triggerType;
  pWindow->watermark = pWindowLogicNode->watermark;

X
Xiaoyu Wang 已提交
855 856 857 858 859 860 861 862 863
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pWindow;
  } else {
    nodesDestroyNode(pWindow);
  }

  return code;
}

X
Xiaoyu Wang 已提交
864 865 866 867
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                       SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
      pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERVAL);
X
Xiaoyu Wang 已提交
868 869 870
  if (NULL == pInterval) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
871 872 873 874

  pInterval->interval = pWindowLogicNode->interval;
  pInterval->offset = pWindowLogicNode->offset;
  pInterval->sliding = pWindowLogicNode->sliding;
H
Haojun Liao 已提交
875 876 877
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;

X
Xiaoyu Wang 已提交
878
  pInterval->pFill = nodesCloneNode(pWindowLogicNode->pFill);
X
Xiaoyu Wang 已提交
879
  if (NULL != pWindowLogicNode->pFill && NULL == pInterval->pFill) {
X
Xiaoyu Wang 已提交
880 881
    nodesDestroyNode(pInterval);
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
882 883
  }

X
Xiaoyu Wang 已提交
884 885 886
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
887 888 889 890
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
      pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW);
X
Xiaoyu Wang 已提交
891 892
  if (NULL == pSession) {
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
893 894
  }

X
Xiaoyu Wang 已提交
895
  pSession->gap = pWindowLogicNode->sessionGap;
X
Xiaoyu Wang 已提交
896

X
Xiaoyu Wang 已提交
897
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
898 899
}

X
Xiaoyu Wang 已提交
900 901 902 903
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
      pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW);
904 905 906 907 908
  if (NULL == pState) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
X
Xiaoyu Wang 已提交
909 910
  SNode*     pStateKey = NULL;
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyNode(pState);
    return code;
  }

  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
936 937
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
                                     SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
938 939
  switch (pWindowLogicNode->winType) {
    case WINDOW_TYPE_INTERVAL:
X
Xiaoyu Wang 已提交
940
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
941
    case WINDOW_TYPE_SESSION:
X
Xiaoyu Wang 已提交
942
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
943
    case WINDOW_TYPE_STATE:
944
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
945 946 947
    default:
      break;
  }
X
Xiaoyu Wang 已提交
948
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
949 950
}

X
Xiaoyu Wang 已提交
951 952 953 954
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
                                   SPhysiNode** pPhyNode) {
  SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pSortLogicNode,
                                                         QUERY_NODE_PHYSICAL_PLAN_SORT);
X
Xiaoyu Wang 已提交
955 956 957 958 959 960
  if (NULL == pSort) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pSortKeys = NULL;
X
Xiaoyu Wang 已提交
961
  int32_t    code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
X
Xiaoyu Wang 已提交
962 963 964 965 966 967 968 969 970 971 972 973

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
X
Xiaoyu Wang 已提交
974 975 976 977
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortLogicNode->node.pTargets, &pSort->pTargets);
X
Xiaoyu Wang 已提交
978
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
979
      code = addDataBlockSlots(pCxt, pSort->pTargets, pSort->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
980 981 982 983 984 985 986 987 988 989 990 991
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pSort;
  } else {
    nodesDestroyNode(pSort);
  }

  return code;
}

X
Xiaoyu Wang 已提交
992 993 994 995
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                        SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
  SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(
      pCxt, getPrecision(pChildren), (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
996 997 998 999 1000 1001
  if (NULL == pPart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pPartitionKeys = NULL;
X
Xiaoyu Wang 已提交
1002
  int32_t    code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
X
Xiaoyu Wang 已提交
1015 1016 1017 1018
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->node.pTargets, &pPart->pTargets);
1019
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1020
      code = addDataBlockSlots(pCxt, pPart->pTargets, pPart->node.pOutputDataBlockDesc);
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pPart;
  } else {
    nodesDestroyNode(pPart);
  }

  return code;
}

X
Xiaoyu Wang 已提交
1033 1034
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                                 SNodeList* pChildren, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1035
  switch (nodeType(pLogicNode)) {
X
Xiaoyu Wang 已提交
1036
    case QUERY_NODE_LOGIC_PLAN_SCAN:
X
Xiaoyu Wang 已提交
1037
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1038
    case QUERY_NODE_LOGIC_PLAN_JOIN:
X
Xiaoyu Wang 已提交
1039
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1040
    case QUERY_NODE_LOGIC_PLAN_AGG:
X
Xiaoyu Wang 已提交
1041
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1042
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
X
Xiaoyu Wang 已提交
1043
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1044
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
X
Xiaoyu Wang 已提交
1045
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1046
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
X
Xiaoyu Wang 已提交
1047
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1048 1049
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
1050 1051
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1052 1053 1054
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1055 1056 1057 1058

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
1059 1060
static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                               SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
  SNodeList* pChildren = nodesMakeList();
  if (NULL == pChildren) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pLogicChild;
  FOREACH(pLogicChild, pLogicNode->pChildren) {
    SPhysiNode* pChild = NULL;
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
    if (TSDB_CODE_SUCCESS == code) {
      code = nodesListStrictAppend(pChildren, pChild);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
X
Xiaoyu Wang 已提交
1079
  }
X
Xiaoyu Wang 已提交
1080

X
Xiaoyu Wang 已提交
1081 1082 1083
  if (TSDB_CODE_SUCCESS == code) {
    (*pPhyNode)->pChildren = pChildren;
    SNode* pChild;
X
Xiaoyu Wang 已提交
1084
    FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
X
Xiaoyu Wang 已提交
1085 1086
  } else {
    nodesDestroyList(pChildren);
X
Xiaoyu Wang 已提交
1087 1088
  }

X
Xiaoyu Wang 已提交
1089
  return code;
X
Xiaoyu Wang 已提交
1090 1091
}

X
Xiaoyu Wang 已提交
1092
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
1093
  SDataInserterNode* pInserter = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
X
Xiaoyu Wang 已提交
1094 1095 1096 1097
  if (NULL == pInserter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1098 1099
  pInserter->numOfTables = pBlocks->numOfTables;
  pInserter->size = pBlocks->size;
wafwerar's avatar
wafwerar 已提交
1100
  TSWAP(pInserter->pData, pBlocks->pData);
X
Xiaoyu Wang 已提交
1101 1102 1103

  *pSink = (SDataSinkNode*)pInserter;
  return TSDB_CODE_SUCCESS;
1104 1105
}

X
Xiaoyu Wang 已提交
1106
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
X
Xiaoyu Wang 已提交
1107
  SDataDispatcherNode* pDispatcher = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH);
X
Xiaoyu Wang 已提交
1108 1109 1110 1111
  if (NULL == pDispatcher) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1112
  pDispatcher->sink.pInputDataBlockDesc = nodesCloneNode(pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1113 1114 1115 1116 1117 1118 1119
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
    nodesDestroyNode(pDispatcher);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pSink = (SDataSinkNode*)pDispatcher;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1120 1121
}

X
Xiaoyu Wang 已提交
1122
static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
X
Xiaoyu Wang 已提交
1123
  SSubplan* pSubplan = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
X
Xiaoyu Wang 已提交
1124 1125 1126
  if (NULL == pSubplan) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1127
  pSubplan->id = pLogicSubplan->id;
X
Xiaoyu Wang 已提交
1128 1129
  pSubplan->subplanType = pLogicSubplan->subplanType;
  pSubplan->level = pLogicSubplan->level;
X
Xiaoyu Wang 已提交
1130 1131 1132
  return pSubplan;
}

X
Xiaoyu Wang 已提交
1133
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
X
Xiaoyu Wang 已提交
1134
  SSubplan* pSubplan = makeSubplan(pCxt, pLogicSubplan);
X
Xiaoyu Wang 已提交
1135 1136 1137 1138 1139 1140
  if (NULL == pSubplan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

1141 1142 1143
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
    SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode;
    pSubplan->msgType = pModif->msgType;
X
Xiaoyu Wang 已提交
1144
    pSubplan->execNode.epSet = pModif->pVgDataBlocks->vg.epSet;
1145
    taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
X
Xiaoyu Wang 已提交
1146
    code = createDataInserter(pCxt, pModif->pVgDataBlocks, &pSubplan->pDataSink);
1147
  } else {
1148
    pSubplan->msgType = TDMT_VND_QUERY;
X
Xiaoyu Wang 已提交
1149 1150 1151 1152
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
    }
1153
  }
X
Xiaoyu Wang 已提交
1154

X
Xiaoyu Wang 已提交
1155 1156 1157 1158
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiSubplan = pSubplan;
  } else {
    nodesDestroyNode(pSubplan);
X
Xiaoyu Wang 已提交
1159
  }
X
Xiaoyu Wang 已提交
1160

X
Xiaoyu Wang 已提交
1161
  return code;
X
Xiaoyu Wang 已提交
1162 1163
}

X
Xiaoyu Wang 已提交
1164 1165 1166 1167 1168 1169 1170 1171 1172
static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
  SQueryPlan* pPlan = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
  if (NULL == pPlan) {
    return NULL;
  }
  pPlan->pSubplans = nodesMakeList();
  if (NULL == pPlan->pSubplans) {
    nodesDestroyNode(pPlan);
    return NULL;
1173
  }
X
Xiaoyu Wang 已提交
1174 1175
  pPlan->queryId = pCxt->pPlanCxt->queryId;
  return pPlan;
1176 1177 1178 1179 1180 1181
}

static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t level, SNodeList* pSubplans) {
  SNodeListNode* pGroup;
  if (level >= LIST_LENGTH(pSubplans)) {
    pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST);
X
bugfix  
Xiaoyu Wang 已提交
1182 1183 1184 1185 1186 1187
    if (NULL == pGroup) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, pGroup)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1188 1189 1190 1191 1192
  } else {
    pGroup = nodesListGetNode(pSubplans, level);
  }
  if (NULL == pGroup->pNodeList) {
    pGroup->pNodeList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
1193 1194 1195
    if (NULL == pGroup->pNodeList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1196
  }
X
bugfix  
Xiaoyu Wang 已提交
1197
  return nodesListStrictAppend(pGroup->pNodeList, pSubplan);
1198 1199
}

X
Xiaoyu Wang 已提交
1200 1201
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent,
                              SQueryPlan* pQueryPlan) {
X
Xiaoyu Wang 已提交
1202
  SSubplan* pSubplan = NULL;
X
Xiaoyu Wang 已提交
1203
  int32_t   code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
X
Xiaoyu Wang 已提交
1204

X
Xiaoyu Wang 已提交
1205 1206 1207
  if (TSDB_CODE_SUCCESS == code) {
    code = pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
    ++(pQueryPlan->numOfSubplans);
1208 1209
  }

1210 1211 1212 1213 1214
  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyNode(pSubplan);
    return code;
  }

X
Xiaoyu Wang 已提交
1215 1216 1217 1218
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
    code = nodesListMakeAppend(&pParent->pChildren, pSubplan);
    if (TSDB_CODE_SUCCESS == code) {
      code = nodesListMakeAppend(&pSubplan->pParents, pParent);
X
Xiaoyu Wang 已提交
1219 1220
    }
  }
X
Xiaoyu Wang 已提交
1221

X
Xiaoyu Wang 已提交
1222 1223 1224 1225 1226 1227
  if (TSDB_CODE_SUCCESS == code) {
    SNode* pChild = NULL;
    FOREACH(pChild, pLogicSubplan->pChildren) {
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
      if (TSDB_CODE_SUCCESS != code) {
        break;
X
Xiaoyu Wang 已提交
1228 1229 1230
      }
    }
  }
X
Xiaoyu Wang 已提交
1231

X
Xiaoyu Wang 已提交
1232
  return code;
1233 1234
}

X
Xiaoyu Wang 已提交
1235 1236 1237 1238
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
  SQueryPlan* pPlan = makeQueryPhysiPlan(pCxt);
  if (NULL == pPlan) {
    return TSDB_CODE_OUT_OF_MEMORY;
1239
  }
X
Xiaoyu Wang 已提交
1240

X
Xiaoyu Wang 已提交
1241
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1242

X
Xiaoyu Wang 已提交
1243 1244 1245 1246 1247 1248
  SNode* pSubplan = NULL;
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
1249 1250
  }

X
Xiaoyu Wang 已提交
1251 1252 1253 1254
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiPlan = pPlan;
  } else {
    nodesDestroyNode(pPlan);
X
Xiaoyu Wang 已提交
1255 1256
  }

X
Xiaoyu Wang 已提交
1257
  return code;
X
Xiaoyu Wang 已提交
1258
}
X
Xiaoyu Wang 已提交
1259

X
Xiaoyu Wang 已提交
1260
static void destoryLocationHash(void* p) {
X
Xiaoyu Wang 已提交
1261
  SHashObj*   pHash = *(SHashObj**)p;
X
bugfix  
Xiaoyu Wang 已提交
1262 1263 1264 1265 1266
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
  while (NULL != pIndex) {
    taosArrayDestroy(pIndex->pSlotIdsInfo);
    pIndex = taosHashIterate(pHash, pIndex);
  }
X
Xiaoyu Wang 已提交
1267 1268 1269 1270 1271 1272 1273
  taosHashCleanup(pHash);
}

static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
}

1274 1275 1276 1277 1278
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
1279
    pPlan->explainInfo.ratio = pStmt->pOptions->ratio;
1280 1281 1282 1283 1284
  } else {
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
  }
}

X
Xiaoyu Wang 已提交
1285
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
X
Xiaoyu Wang 已提交
1286 1287 1288 1289 1290
  SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
                           .errCode = TSDB_CODE_SUCCESS,
                           .nextDataBlockId = 0,
                           .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
                           .pExecNodeList = pExecNodeList};
X
Xiaoyu Wang 已提交
1291 1292 1293
  if (NULL == cxt.pLocationHelper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1294

X
Xiaoyu Wang 已提交
1295 1296 1297 1298 1299 1300 1301
  int32_t code = TSDB_CODE_SUCCESS;
  if (tsQueryPolicy > QUERY_POLICY_VNODE) {
    code = catalogGetQnodeList(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pExecNodeList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
  }
1302 1303 1304 1305
  if (TSDB_CODE_SUCCESS == code) {
    setExplainInfo(pCxt, *pPlan);
  }

X
Xiaoyu Wang 已提交
1306 1307
  destoryPhysiPlanContext(&cxt);
  return code;
X
Xiaoyu Wang 已提交
1308
}