planPhysiCreater.c 49.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "planInt.h"
X
Xiaoyu Wang 已提交
17

X
Xiaoyu Wang 已提交
18
#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "functionMgt.h"
20
#include "systable.h"
X
Xiaoyu Wang 已提交
21
#include "tglobal.h"
X
Xiaoyu Wang 已提交
22

X
bugfix  
Xiaoyu Wang 已提交
23 24
typedef struct SSlotIdInfo {
  int16_t slotId;
X
Xiaoyu Wang 已提交
25
  bool    set;
X
bugfix  
Xiaoyu Wang 已提交
26 27
} SSlotIdInfo;

X
Xiaoyu Wang 已提交
28 29
typedef struct SSlotIndex {
  int16_t dataBlockId;
X
Xiaoyu Wang 已提交
30
  SArray* pSlotIdsInfo;  // duplicate name slot
X
Xiaoyu Wang 已提交
31 32 33
} SSlotIndex;

typedef struct SPhysiPlanContext {
X
Xiaoyu Wang 已提交
34
  SPlanContext* pPlanCxt;
X
Xiaoyu Wang 已提交
35 36 37 38
  int32_t       errCode;
  int16_t       nextDataBlockId;
  SArray*       pLocationHelper;
  SArray*       pExecNodeList;
X
Xiaoyu Wang 已提交
39 40
} SPhysiPlanContext;

X
Xiaoyu Wang 已提交
41
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
X
Xiaoyu Wang 已提交
42 43
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
    SColumnNode* pCol = (SColumnNode*)pNode;
X
Xiaoyu Wang 已提交
44
    if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
45 46
      return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
    }
X
Xiaoyu Wang 已提交
47 48 49 50 51
    if ('\0' == pCol->tableAlias[0]) {
      return sprintf(pKey, "%s", pCol->colName);
    }
    return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
  }
X
Xiaoyu Wang 已提交
52

X
Xiaoyu Wang 已提交
53
  if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
54 55
    return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName);
  }
X
Xiaoyu Wang 已提交
56 57 58
  return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}

X
bugfix  
Xiaoyu Wang 已提交
59
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId, bool output) {
X
Xiaoyu Wang 已提交
60
  SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
X
bugfix  
Xiaoyu Wang 已提交
61 62 63
  if (NULL == pSlot) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
64 65 66
  pSlot->slotId = slotId;
  pSlot->dataType = ((SExprNode*)pNode)->resType;
  pSlot->reserve = false;
X
bugfix  
Xiaoyu Wang 已提交
67
  pSlot->output = output;
X
Xiaoyu Wang 已提交
68 69 70
  return (SNode*)pSlot;
}

X
bugfix  
Xiaoyu Wang 已提交
71
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
X
Xiaoyu Wang 已提交
72 73
  STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
  if (NULL == pTarget) {
X
bugfix  
Xiaoyu Wang 已提交
74
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
75
  }
X
bugfix  
Xiaoyu Wang 已提交
76

X
Xiaoyu Wang 已提交
77 78 79
  pTarget->dataBlockId = dataBlockId;
  pTarget->slotId = slotId;
  pTarget->pExpr = pNode;
X
bugfix  
Xiaoyu Wang 已提交
80 81 82

  *pOutput = (SNode*)pTarget;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
83 84
}

X
bugfix  
Xiaoyu Wang 已提交
85
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
86 87
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
  if (NULL != pIndex) {
X
Xiaoyu Wang 已提交
88
    SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
89 90 91 92
    taosArrayPush(pIndex->pSlotIdsInfo, &info);
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
93
  SSlotIndex index = {.dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo))};
X
bugfix  
Xiaoyu Wang 已提交
94 95 96
  if (NULL == index.pSlotIdsInfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
97
  SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
98
  taosArrayPush(index.pSlotIdsInfo, &info);
X
bugfix  
Xiaoyu Wang 已提交
99 100 101 102
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}

static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
X
Xiaoyu Wang 已提交
103
  char    name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
X
Xiaoyu Wang 已提交
104
  int32_t len = getSlotKey(pNode, NULL, name);
X
bugfix  
Xiaoyu Wang 已提交
105 106 107
  return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash);
}

X
Xiaoyu Wang 已提交
108 109
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
                                       SHashObj** pDescHash) {
X
bugfix  
Xiaoyu Wang 已提交
110
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
bugfix  
Xiaoyu Wang 已提交
111 112 113 114 115 116 117 118 119 120 121 122
  if (NULL == pHash) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
    taosHashCleanup(pHash);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pDescHash = pHash;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
123 124
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
                                   SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
125
  pDataBlockDesc->pSlots = nodesMakeList();
X
Xiaoyu Wang 已提交
126
  if (NULL == pDataBlockDesc->pSlots) {
X
bugfix  
Xiaoyu Wang 已提交
127 128
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
129

X
bugfix  
Xiaoyu Wang 已提交
130 131
  int32_t code = TSDB_CODE_SUCCESS;
  int16_t slotId = 0;
X
Xiaoyu Wang 已提交
132
  SNode*  pNode = NULL;
X
bugfix  
Xiaoyu Wang 已提交
133 134 135 136 137 138
  FOREACH(pNode, pList) {
    code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId, true));
    if (TSDB_CODE_SUCCESS == code) {
      code = putSlotToHash(pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
    }
    if (TSDB_CODE_SUCCESS == code) {
139 140
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
X
bugfix  
Xiaoyu Wang 已提交
141 142 143
      ++slotId;
    } else {
      break;
X
Xiaoyu Wang 已提交
144
    }
X
bugfix  
Xiaoyu Wang 已提交
145 146 147 148 149 150 151 152 153 154 155 156
  }
  return code;
}

static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
  SDataBlockDescNode* pDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
  if (NULL == pDesc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pDesc->dataBlockId = pCxt->nextDataBlockId++;

  SHashObj* pHash = NULL;
X
Xiaoyu Wang 已提交
157
  int32_t   code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash);
X
bugfix  
Xiaoyu Wang 已提交
158 159 160 161 162 163
  if (TSDB_CODE_SUCCESS == code) {
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pDataBlockDesc = pDesc;
X
Xiaoyu Wang 已提交
164
  } else {
X
bugfix  
Xiaoyu Wang 已提交
165
    nodesDestroyNode(pDesc);
X
Xiaoyu Wang 已提交
166
  }
X
bugfix  
Xiaoyu Wang 已提交
167 168 169 170

  return code;
}

X
bugfix  
Xiaoyu Wang 已提交
171 172 173 174 175 176 177 178 179 180 181 182
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
  for (int32_t i = 0; i < size; ++i) {
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
    if (!pInfo->set) {
      pInfo->set = true;
      return pInfo->slotId;
    }
  }
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
}

X
Xiaoyu Wang 已提交
183 184
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
                                     const char* pStmtName, bool output) {
185 186 187 188
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
189
  int32_t   code = TSDB_CODE_SUCCESS;
X
bugfix  
Xiaoyu Wang 已提交
190
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
X
Xiaoyu Wang 已提交
191 192
  int16_t   nextSlotId = taosHashGetSize(pHash), slotId = 0;
  SNode*    pNode = NULL;
X
Xiaoyu Wang 已提交
193
  FOREACH(pNode, pList) {
X
Xiaoyu Wang 已提交
194 195 196
    SNode*      pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
    char        name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    int32_t     len = getSlotKey(pExpr, pStmtName, name);
X
bugfix  
Xiaoyu Wang 已提交
197 198
    SSlotIndex* pIndex = taosHashGet(pHash, name, len);
    if (NULL == pIndex) {
199
      code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pExpr, nextSlotId, output));
X
bugfix  
Xiaoyu Wang 已提交
200 201 202
      if (TSDB_CODE_SUCCESS == code) {
        code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
      }
203 204 205 206
      pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
      if (output) {
        pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
      }
X
bugfix  
Xiaoyu Wang 已提交
207 208 209
      slotId = nextSlotId;
      ++nextSlotId;
    } else {
X
bugfix  
Xiaoyu Wang 已提交
210
      slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
X
bugfix  
Xiaoyu Wang 已提交
211
    }
212

X
bugfix  
Xiaoyu Wang 已提交
213 214 215 216 217 218 219
    if (TSDB_CODE_SUCCESS == code) {
      SNode* pTarget = NULL;
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pTarget);
      }
    }
X
Xiaoyu Wang 已提交
220

X
bugfix  
Xiaoyu Wang 已提交
221 222 223
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
224
  }
X
bugfix  
Xiaoyu Wang 已提交
225 226 227 228
  return code;
}

static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
229 230 231
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false);
}

232 233 234 235 236 237
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
  if (NULL == pNode || NULL == *pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
238
  int32_t    code = nodesListMakeAppend(&pList, *pNode);
239 240 241 242 243 244 245 246 247 248
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pNode = nodesListGetNode(pList, 0);
  }
  nodesClearList(pList);
  return code;
}

X
Xiaoyu Wang 已提交
249 250
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
                                           SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
251
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true);
X
bugfix  
Xiaoyu Wang 已提交
252 253 254
}

static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
255
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true);
X
Xiaoyu Wang 已提交
256 257 258
}

typedef struct SSetSlotIdCxt {
X
Xiaoyu Wang 已提交
259
  int32_t   errCode;
X
Xiaoyu Wang 已提交
260 261 262 263
  SHashObj* pLeftHash;
  SHashObj* pRightHash;
} SSetSlotIdCxt;

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
static void dumpSlots(const char* pName, SHashObj* pHash) {
  if (NULL == pHash) {
    return;
  }
  planDebug("%s", pName);
  void* pIt = taosHashIterate(pHash, NULL);
  while (NULL != pIt) {
    size_t len = 0;
    char*  pKey = taosHashGetKey(pIt, &len);
    char   name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    strncpy(name, pKey, len);
    planDebug("\tslot name = %s", name);
    pIt = taosHashIterate(pHash, pIt);
  }
}

X
Xiaoyu Wang 已提交
280 281 282
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
X
Xiaoyu Wang 已提交
283 284 285
    char           name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    int32_t        len = getSlotKey(pNode, NULL, name);
    SSlotIndex*    pIndex = taosHashGet(pCxt->pLeftHash, name, len);
X
Xiaoyu Wang 已提交
286 287 288 289
    if (NULL == pIndex) {
      pIndex = taosHashGet(pCxt->pRightHash, name, len);
    }
    // pIndex is definitely not NULL, otherwise it is a bug
X
bugfix  
Xiaoyu Wang 已提交
290
    if (NULL == pIndex) {
291
      planError("doSetSlotId failed, invalid slot name %s", name);
292 293
      dumpSlots("left datablock desc", pCxt->pLeftHash);
      dumpSlots("right datablock desc", pCxt->pRightHash);
294
      pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
X
bugfix  
Xiaoyu Wang 已提交
295 296
      return DEAL_RES_ERROR;
    }
X
Xiaoyu Wang 已提交
297
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
X
bugfix  
Xiaoyu Wang 已提交
298
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
X
Xiaoyu Wang 已提交
299 300 301 302 303
    return DEAL_RES_IGNORE_CHILD;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
304 305
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
                             SNode** pOutput) {
X
Xiaoyu Wang 已提交
306
  SNode* pRes = nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
307 308 309 310 311
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
312 313 314
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
315
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
316 317
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyNode(pRes);
X
Xiaoyu Wang 已提交
318
    return cxt.errCode;
X
Xiaoyu Wang 已提交
319
  }
X
Xiaoyu Wang 已提交
320 321 322

  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
323 324
}

X
Xiaoyu Wang 已提交
325 326
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
                             const SNodeList* pList, SNodeList** pOutput) {
X
Xiaoyu Wang 已提交
327
  SNodeList* pRes = nodesCloneList(pList);
X
Xiaoyu Wang 已提交
328 329 330 331 332
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
333 334 335
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
336
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
337 338
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyList(pRes);
X
Xiaoyu Wang 已提交
339
    return cxt.errCode;
X
Xiaoyu Wang 已提交
340
  }
X
Xiaoyu Wang 已提交
341 342
  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
343 344
}

345 346 347 348 349 350 351 352 353 354 355 356
static uint8_t getPrecision(SNodeList* pChildren) {
  if (1 == LIST_LENGTH(pChildren)) {
    return (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc)->precision;
  } else if (2 == LIST_LENGTH(pChildren)) {
    uint8_t lp = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc)->precision;
    uint8_t rp = (((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc)->precision;
    return (lp > rp ? rp : lp);
  }
  return 0;
}

static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, uint8_t precision, SLogicNode* pLogicNode, ENodeType type) {
X
Xiaoyu Wang 已提交
357
  SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
X
Xiaoyu Wang 已提交
358 359 360
  if (NULL == pPhysiNode) {
    return NULL;
  }
X
bugfix  
Xiaoyu Wang 已提交
361 362 363

  int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
  if (TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
364
    nodesDestroyNode(pPhysiNode);
X
Xiaoyu Wang 已提交
365 366
    return NULL;
  }
367
  pPhysiNode->pOutputDataBlockDesc->precision = precision;
X
Xiaoyu Wang 已提交
368 369 370 371 372
  return pPhysiNode;
}

static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
  if (NULL != pLogicNode->pConditions) {
X
Xiaoyu Wang 已提交
373 374
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions,
                         &pPhysiNode->pConditions);
X
Xiaoyu Wang 已提交
375 376 377 378
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
379 380 381 382 383 384 385 386 387 388 389 390 391
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
}

static int32_t sortScanCols(SNodeList* pScanCols) {
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
  if (NULL == pArray) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNode* pCol = NULL;
X
Xiaoyu Wang 已提交
392
  FOREACH(pCol, pScanCols) { taosArrayPush(pArray, &pCol); }
X
Xiaoyu Wang 已提交
393 394 395
  taosArraySort(pArray, colIdCompare);

  int32_t index = 0;
X
Xiaoyu Wang 已提交
396
  FOREACH(pCol, pScanCols) { REPLACE_NODE(taosArrayGetP(pArray, index++)); }
X
Xiaoyu Wang 已提交
397 398 399 400 401
  taosArrayDestroy(pArray);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
402
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
403 404 405 406
  if (NULL == pScanCols) {
    return TSDB_CODE_SUCCESS;
  }

407 408 409
  pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
  if (NULL == pScanPhysiNode->pScanCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
410
  }
X
bugfix  
Xiaoyu Wang 已提交
411
  return sortScanCols(pScanPhysiNode->pScanCols);
412 413
}

X
Xiaoyu Wang 已提交
414 415
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode,
                                           SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
416 417 418
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
  if (TSDB_CODE_SUCCESS == code) {
    // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
X
bugfix  
Xiaoyu Wang 已提交
419
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
420
  }
421 422 423 424 425 426 427 428 429 430 431 432

  if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
    pScanPhysiNode->pScanPseudoCols = nodesCloneList(pScanLogicNode->pScanPseudoCols);
    if (NULL == pScanPhysiNode->pScanPseudoCols) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
  }

X
Xiaoyu Wang 已提交
433 434 435
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
  }
436

X
Xiaoyu Wang 已提交
437 438 439 440 441
  if (TSDB_CODE_SUCCESS == code) {
    pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
    pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
  }
X
Xiaoyu Wang 已提交
442

X
Xiaoyu Wang 已提交
443 444 445 446 447
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
  } else {
    nodesDestroyNode(pScanPhysiNode);
  }
X
Xiaoyu Wang 已提交
448

X
Xiaoyu Wang 已提交
449
  return code;
X
Xiaoyu Wang 已提交
450 451
}

452 453
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
454
  pNodeAddr->epSet = vg->epSet;
455 456
}

457 458
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                      SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
459 460
  STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(
      pCxt, pScanLogicNode->pMeta->tableInfo.precision, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
X
Xiaoyu Wang 已提交
461 462 463
  if (NULL == pTagScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
464 465
  vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
  taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
X
Xiaoyu Wang 已提交
466
  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
X
Xiaoyu Wang 已提交
467 468
}

X
Xiaoyu Wang 已提交
469 470 471 472 473
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                        SPhysiNode** pPhyNode) {
  STableScanPhysiNode* pTableScan =
      (STableScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision, (SLogicNode*)pScanLogicNode,
                                          QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
X
Xiaoyu Wang 已提交
474 475 476 477
  if (NULL == pTableScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
478
  memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
X
Xiaoyu Wang 已提交
479
  pTableScan->scanRange = pScanLogicNode->scanRange;
480
  pTableScan->ratio = pScanLogicNode->ratio;
5
54liuyao 已提交
481 482 483 484 485 486 487
  if (pScanLogicNode->pVgroupList) {
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
  }
  if (pCxt->pExecNodeList) {
    taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
  }
D
dapan1121 已提交
488
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
489 490 491 492 493 494
  pTableScan->dataRequired = pScanLogicNode->dataRequired;
  pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
  if (NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) {
    nodesDestroyNode(pTableScan);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
495 496 497 498 499
  pTableScan->interval = pScanLogicNode->interval;
  pTableScan->offset = pScanLogicNode->offset;
  pTableScan->sliding = pScanLogicNode->sliding;
  pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
  pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
X
Xiaoyu Wang 已提交
500 501

  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
X
Xiaoyu Wang 已提交
502 503
}

X
Xiaoyu Wang 已提交
504 505 506 507 508
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                              SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
  SSystemTableScanPhysiNode* pScan =
      (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision,
                                                (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
X
Xiaoyu Wang 已提交
509 510 511 512
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
513 514
  pScan->showRewrite = pScanLogicNode->showRewrite;
  pScan->accountId = pCxt->pPlanCxt->acctId;
X
Xiaoyu Wang 已提交
515 516 517 518
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
    taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
  } else {
X
Xiaoyu Wang 已提交
519
    SQueryNodeAddr addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet};
D
dapan1121 已提交
520
    taosArrayPush(pCxt->pExecNodeList, &addr);
X
Xiaoyu Wang 已提交
521
  }
X
Xiaoyu Wang 已提交
522
  pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
D
dapan1121 已提交
523
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
X
Xiaoyu Wang 已提交
524 525

  return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
X
Xiaoyu Wang 已提交
526 527
}

X
Xiaoyu Wang 已提交
528 529
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                         SPhysiNode** pPhyNode) {
5
54liuyao 已提交
530 531 532 533
  int32_t res = createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
  if (res == TSDB_CODE_SUCCESS) {
    ENodeType type = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
    setNodeType(*pPhyNode, type);
X
Xiaoyu Wang 已提交
534
  }
5
54liuyao 已提交
535
  return res;
X
Xiaoyu Wang 已提交
536 537
}

X
Xiaoyu Wang 已提交
538 539
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
540 541
  switch (pScanLogicNode->scanType) {
    case SCAN_TYPE_TAG:
542
      return createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
543
    case SCAN_TYPE_TABLE:
X
Xiaoyu Wang 已提交
544
      return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
545
    case SCAN_TYPE_SYSTEM_TABLE:
X
Xiaoyu Wang 已提交
546
      return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
547
    case SCAN_TYPE_STREAM:
X
Xiaoyu Wang 已提交
548
      return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
549 550 551
    default:
      break;
  }
X
Xiaoyu Wang 已提交
552
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
553 554
}

X
Xiaoyu Wang 已提交
555 556 557 558
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
                                   SPhysiNode** pPhyNode) {
  SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pJoinLogicNode,
                                                         QUERY_NODE_PHYSICAL_PLAN_JOIN);
X
Xiaoyu Wang 已提交
559 560 561
  if (NULL == pJoin) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
562

563 564
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
X
Xiaoyu Wang 已提交
565
  int32_t             code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
566

567 568
  pJoin->joinType = pJoinLogicNode->joinType;
  if (NULL != pJoinLogicNode->pOnConditions) {
X
Xiaoyu Wang 已提交
569 570
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions,
                         &pJoin->pOnConditions);
571
  }
X
Xiaoyu Wang 已提交
572
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
573 574
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
                         &pJoin->pTargets);
X
Xiaoyu Wang 已提交
575 576
  }
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
577
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
578 579 580 581
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
  }
X
Xiaoyu Wang 已提交
582

X
Xiaoyu Wang 已提交
583 584 585 586 587
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pJoin;
  } else {
    nodesDestroyNode(pJoin);
  }
X
Xiaoyu Wang 已提交
588

X
Xiaoyu Wang 已提交
589
  return code;
X
Xiaoyu Wang 已提交
590 591 592
}

typedef struct SRewritePrecalcExprsCxt {
X
Xiaoyu Wang 已提交
593 594 595
  int32_t    errCode;
  int32_t    planNodeId;
  int32_t    rewriteId;
X
Xiaoyu Wang 已提交
596 597 598 599 600
  SNodeList* pPrecalcExprs;
} SRewritePrecalcExprsCxt;

static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SNode* pExpr = nodesCloneNode(*pNode);
X
bugfix  
Xiaoyu Wang 已提交
601
  if (NULL == pExpr) {
602
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
bugfix  
Xiaoyu Wang 已提交
603 604
    return DEAL_RES_ERROR;
  }
X
Xiaoyu Wang 已提交
605
  if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
606
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
607 608 609 610 611
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
  if (NULL == pCol) {
612
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
613 614 615 616 617 618 619 620
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
  pCol->node.resType = pRewrittenExpr->resType;
  if ('\0' != pRewrittenExpr->aliasName[0]) {
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  } else {
X
Xiaoyu Wang 已提交
621 622
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId,
             pCxt->rewriteId);
X
Xiaoyu Wang 已提交
623 624 625 626 627 628 629
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  }
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)pCol;
  return DEAL_RES_IGNORE_CHILD;
}

630 631 632 633 634
static int32_t rewriteValueToOperator(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SOperatorNode* pOper = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
  if (NULL == pOper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
635 636 637 638 639
  pOper->pLeft = nodesMakeNode(QUERY_NODE_LEFT_VALUE);
  if (NULL == pOper->pLeft) {
    nodesDestroyNode(pOper);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
640 641 642 643
  SValueNode* pVal = (SValueNode*)*pNode;
  pOper->node.resType = pVal->node.resType;
  strcpy(pOper->node.aliasName, pVal->node.aliasName);
  pOper->opType = OP_TYPE_ASSIGN;
644
  pOper->pRight = *pNode;
645 646 647 648
  *pNode = (SNode*)pOper;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
649 650 651
static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
  switch (nodeType(*pNode)) {
652
    case QUERY_NODE_VALUE: {
653 654
      pCxt->errCode = rewriteValueToOperator(pCxt, pNode);
      if (TSDB_CODE_SUCCESS != pCxt->errCode) {
655 656 657 658
        return DEAL_RES_ERROR;
      }
      return collectAndRewrite(pCxt, pNode);
    }
X
Xiaoyu Wang 已提交
659 660
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
661
      return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
662 663
    }
    case QUERY_NODE_FUNCTION: {
664
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
665
        return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
666 667 668 669 670 671 672 673
      }
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
674 675
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs,
                                   SNodeList** pRewrittenList) {
X
Xiaoyu Wang 已提交
676 677 678 679 680 681
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == *pPrecalcExprs) {
    *pPrecalcExprs = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
682 683 684
    if (NULL == *pPrecalcExprs) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
685 686 687
  }
  if (NULL == *pRewrittenList) {
    *pRewrittenList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
688 689 690
    if (NULL == *pRewrittenList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
691 692 693 694 695 696 697 698 699
  }
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
    SNode* pNew = NULL;
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
      pNew = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0));
    } else {
      pNew = nodesCloneNode(pNode);
    }
X
bugfix  
Xiaoyu Wang 已提交
700 701 702 703 704 705
    if (NULL == pNew) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
706
  }
X
Xiaoyu Wang 已提交
707
  SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs};
X
Xiaoyu Wang 已提交
708
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
709 710
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs) || TSDB_CODE_SUCCESS != cxt.errCode) {
    DESTORY_LIST(*pPrecalcExprs);
X
Xiaoyu Wang 已提交
711 712 713 714
  }
  return cxt.errCode;
}

X
Xiaoyu Wang 已提交
715 716
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs,
                                  SNode** pRewritten) {
717 718 719 720 721
  if (NULL == pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
722
  int32_t    code = nodesListMakeAppend(&pList, pNode);
723 724 725 726 727 728 729 730 731 732 733 734
  SNodeList* pRewrittenList = NULL;
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
  }
  nodesClearList(pList);
  nodesClearList(pRewrittenList);
  return code;
}

X
Xiaoyu Wang 已提交
735 736 737 738
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
                                  SPhysiNode** pPhyNode) {
  SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pAggLogicNode,
                                                      QUERY_NODE_PHYSICAL_PLAN_AGG);
X
Xiaoyu Wang 已提交
739 740 741
  if (NULL == pAgg) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
742 743 744 745

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pGroupKeys = NULL;
  SNodeList* pAggFuncs = NULL;
X
Xiaoyu Wang 已提交
746
  int32_t    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
X
Xiaoyu Wang 已提交
747 748 749
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
  }
X
Xiaoyu Wang 已提交
750

X
Xiaoyu Wang 已提交
751 752
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
X
Xiaoyu Wang 已提交
753 754 755
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
756
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
757
    }
X
Xiaoyu Wang 已提交
758 759
  }

X
Xiaoyu Wang 已提交
760 761 762
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
763
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
764
    }
X
Xiaoyu Wang 已提交
765 766
  }

X
Xiaoyu Wang 已提交
767 768 769
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
770
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
771
    }
X
Xiaoyu Wang 已提交
772 773
  }

X
Xiaoyu Wang 已提交
774 775 776
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
  }
X
Xiaoyu Wang 已提交
777

X
Xiaoyu Wang 已提交
778 779 780 781 782
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pAgg;
  } else {
    nodesDestroyNode(pAgg);
  }
X
Xiaoyu Wang 已提交
783

X
bugfix  
Xiaoyu Wang 已提交
784 785 786 787
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pGroupKeys);
  nodesDestroyList(pAggFuncs);

X
Xiaoyu Wang 已提交
788
  return code;
X
Xiaoyu Wang 已提交
789 790
}

X
Xiaoyu Wang 已提交
791 792 793 794
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                      SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
  SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(
      pCxt, getPrecision(pChildren), (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
X
Xiaoyu Wang 已提交
795 796 797
  if (NULL == pProject) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
798

799 800 801 802 803
  pProject->limit = pProjectLogicNode->limit;
  pProject->offset = pProjectLogicNode->offset;
  pProject->slimit = pProjectLogicNode->slimit;
  pProject->soffset = pProjectLogicNode->soffset;

X
Xiaoyu Wang 已提交
804 805
  int32_t code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId,
                               -1, pProjectLogicNode->pProjections, &pProject->pProjections);
X
Xiaoyu Wang 已提交
806
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
807 808
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections,
                                       pProject->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
809 810 811 812
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
  }
X
Xiaoyu Wang 已提交
813

X
Xiaoyu Wang 已提交
814 815 816 817 818
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pProject;
  } else {
    nodesDestroyNode(pProject);
  }
X
Xiaoyu Wang 已提交
819

X
Xiaoyu Wang 已提交
820
  return code;
X
Xiaoyu Wang 已提交
821 822
}

X
Xiaoyu Wang 已提交
823 824 825 826
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                         SPhysiNode** pPhyNode) {
  SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(
      pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
827 828 829
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
830

X
Xiaoyu Wang 已提交
831
  pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
X
bugfix  
Xiaoyu Wang 已提交
832
  *pPhyNode = (SPhysiNode*)pExchange;
X
Xiaoyu Wang 已提交
833

X
bugfix  
Xiaoyu Wang 已提交
834
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
835
}
X
Xiaoyu Wang 已提交
836 837
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                                   SPhysiNode** pPhyNode) {
5
54liuyao 已提交
838
  SScanPhysiNode* pScan = (SScanPhysiNode*)makePhysiNode(
X
Xiaoyu Wang 已提交
839
      pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
X
Xiaoyu Wang 已提交
840 841 842
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
843

X
bugfix  
Xiaoyu Wang 已提交
844 845 846 847 848 849
  int32_t code = TSDB_CODE_SUCCESS;

  pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
  if (NULL == pScan->pScanCols) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
L
fix  
Liu Jicong 已提交
850 851 852 853 854

  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }

855 856 857
  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }
X
Xiaoyu Wang 已提交
858
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
859
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
860 861 862 863 864 865 866 867 868 869 870
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScan;
  } else {
    nodesDestroyNode(pScan);
  }

  return code;
}

X
Xiaoyu Wang 已提交
871 872
static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                       SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
873 874
  if (pCxt->pPlanCxt->streamQuery) {
    return createStreamScanPhysiNodeByExchange(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
875
  } else {
X
Xiaoyu Wang 已提交
876
    return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
877
  }
X
Xiaoyu Wang 已提交
878 879
}

X
Xiaoyu Wang 已提交
880 881
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWinodwPhysiNode* pWindow,
                                             SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
882 883
  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pFuncs = NULL;
X
Xiaoyu Wang 已提交
884
  int32_t    code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
X
Xiaoyu Wang 已提交
885 886 887 888 889 890

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
891
      code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
892 893 894
    }
  }

895 896 897 898
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
  }

X
Xiaoyu Wang 已提交
899 900 901
  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
902
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
903 904 905
    }
  }

X
Xiaoyu Wang 已提交
906 907 908
  pWindow->triggerType = pWindowLogicNode->triggerType;
  pWindow->watermark = pWindowLogicNode->watermark;

X
Xiaoyu Wang 已提交
909 910 911 912 913 914 915 916 917
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pWindow;
  } else {
    nodesDestroyNode(pWindow);
  }

  return code;
}

X
Xiaoyu Wang 已提交
918 919 920
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                       SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
X
Xiaoyu Wang 已提交
921 922
      pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode,
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL : QUERY_NODE_PHYSICAL_PLAN_INTERVAL));
X
Xiaoyu Wang 已提交
923 924 925
  if (NULL == pInterval) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
926 927 928 929

  pInterval->interval = pWindowLogicNode->interval;
  pInterval->offset = pWindowLogicNode->offset;
  pInterval->sliding = pWindowLogicNode->sliding;
H
Haojun Liao 已提交
930 931 932
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;

X
Xiaoyu Wang 已提交
933 934 935
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
936 937 938 939
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
      pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW);
X
Xiaoyu Wang 已提交
940 941
  if (NULL == pSession) {
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
942 943
  }

X
Xiaoyu Wang 已提交
944
  pSession->gap = pWindowLogicNode->sessionGap;
X
Xiaoyu Wang 已提交
945

X
Xiaoyu Wang 已提交
946
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
947 948
}

X
Xiaoyu Wang 已提交
949 950 951 952
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
      pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW);
953 954 955 956 957
  if (NULL == pState) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
X
Xiaoyu Wang 已提交
958 959
  SNode*     pStateKey = NULL;
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyNode(pState);
    return code;
  }

  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
985 986
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
                                     SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
987 988
  switch (pWindowLogicNode->winType) {
    case WINDOW_TYPE_INTERVAL:
X
Xiaoyu Wang 已提交
989
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
990
    case WINDOW_TYPE_SESSION:
X
Xiaoyu Wang 已提交
991
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
992
    case WINDOW_TYPE_STATE:
993
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
994 995 996
    default:
      break;
  }
X
Xiaoyu Wang 已提交
997
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
998 999
}

X
Xiaoyu Wang 已提交
1000 1001 1002 1003
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
                                   SPhysiNode** pPhyNode) {
  SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pSortLogicNode,
                                                         QUERY_NODE_PHYSICAL_PLAN_SORT);
X
Xiaoyu Wang 已提交
1004 1005 1006 1007 1008 1009
  if (NULL == pSort) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pSortKeys = NULL;
X
Xiaoyu Wang 已提交
1010
  int32_t    code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
X
Xiaoyu Wang 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
X
Xiaoyu Wang 已提交
1023 1024 1025 1026
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortLogicNode->node.pTargets, &pSort->pTargets);
X
Xiaoyu Wang 已提交
1027
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1028
      code = addDataBlockSlots(pCxt, pSort->pTargets, pSort->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pSort;
  } else {
    nodesDestroyNode(pSort);
  }

  return code;
}

X
Xiaoyu Wang 已提交
1041 1042 1043 1044
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                        SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
  SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(
      pCxt, getPrecision(pChildren), (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
1045 1046 1047 1048 1049 1050
  if (NULL == pPart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pPartitionKeys = NULL;
X
Xiaoyu Wang 已提交
1051
  int32_t    code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
X
Xiaoyu Wang 已提交
1064 1065 1066 1067
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->node.pTargets, &pPart->pTargets);
1068
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1069
      code = addDataBlockSlots(pCxt, pPart->pTargets, pPart->node.pOutputDataBlockDesc);
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pPart;
  } else {
    nodesDestroyNode(pPart);
  }

  return code;
}

X
Xiaoyu Wang 已提交
1082 1083 1084 1085 1086 1087 1088 1089
static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SFillLogicNode* pFillNode,
                                   SPhysiNode** pPhyNode) {
  SFillPhysiNode* pFill = (SFillPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pFillNode,
                                                         QUERY_NODE_PHYSICAL_PLAN_FILL);
  if (NULL == pFill) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1090 1091 1092
  pFill->mode = pFillNode->mode;
  pFill->timeRange = pFillNode->timeRange;

X
Xiaoyu Wang 已提交
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->node.pTargets, &pFill->pTargets);
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pFill->pTargets, pFill->node.pOutputDataBlockDesc);
  }

  if (TSDB_CODE_SUCCESS == code) {
    pFill->pWStartTs = nodesCloneNode(pFillNode->pWStartTs);
    if (NULL == pFill->pWStartTs) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pFillNode->pValues) {
    pFill->pValues = nodesCloneNode(pFillNode->pValues);
    if (NULL == pFill->pValues) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pFill;
  } else {
    nodesDestroyNode(pFill);
  }

  return code;
}

X
Xiaoyu Wang 已提交
1122 1123
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                                 SNodeList* pChildren, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1124
  switch (nodeType(pLogicNode)) {
X
Xiaoyu Wang 已提交
1125
    case QUERY_NODE_LOGIC_PLAN_SCAN:
X
Xiaoyu Wang 已提交
1126
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1127
    case QUERY_NODE_LOGIC_PLAN_JOIN:
X
Xiaoyu Wang 已提交
1128
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1129
    case QUERY_NODE_LOGIC_PLAN_AGG:
X
Xiaoyu Wang 已提交
1130
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1131
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
X
Xiaoyu Wang 已提交
1132
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1133
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
X
Xiaoyu Wang 已提交
1134
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1135
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
X
Xiaoyu Wang 已提交
1136
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1137 1138
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
1139 1140
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1141 1142
    case QUERY_NODE_LOGIC_PLAN_FILL:
      return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1143 1144 1145
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1146 1147 1148 1149

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
1150 1151
static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                               SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
  SNodeList* pChildren = nodesMakeList();
  if (NULL == pChildren) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pLogicChild;
  FOREACH(pLogicChild, pLogicNode->pChildren) {
    SPhysiNode* pChild = NULL;
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
    if (TSDB_CODE_SUCCESS == code) {
      code = nodesListStrictAppend(pChildren, pChild);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
X
Xiaoyu Wang 已提交
1170
  }
X
Xiaoyu Wang 已提交
1171

X
Xiaoyu Wang 已提交
1172 1173 1174
  if (TSDB_CODE_SUCCESS == code) {
    (*pPhyNode)->pChildren = pChildren;
    SNode* pChild;
X
Xiaoyu Wang 已提交
1175
    FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
X
Xiaoyu Wang 已提交
1176 1177
  } else {
    nodesDestroyList(pChildren);
X
Xiaoyu Wang 已提交
1178 1179
  }

X
Xiaoyu Wang 已提交
1180
  return code;
X
Xiaoyu Wang 已提交
1181 1182
}

X
Xiaoyu Wang 已提交
1183
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
1184
  SDataInserterNode* pInserter = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
X
Xiaoyu Wang 已提交
1185 1186 1187 1188
  if (NULL == pInserter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1189 1190
  pInserter->numOfTables = pBlocks->numOfTables;
  pInserter->size = pBlocks->size;
wafwerar's avatar
wafwerar 已提交
1191
  TSWAP(pInserter->pData, pBlocks->pData);
X
Xiaoyu Wang 已提交
1192 1193 1194

  *pSink = (SDataSinkNode*)pInserter;
  return TSDB_CODE_SUCCESS;
1195 1196
}

X
Xiaoyu Wang 已提交
1197
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
X
Xiaoyu Wang 已提交
1198
  SDataDispatcherNode* pDispatcher = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH);
X
Xiaoyu Wang 已提交
1199 1200 1201 1202
  if (NULL == pDispatcher) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1203
  pDispatcher->sink.pInputDataBlockDesc = nodesCloneNode(pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1204 1205 1206 1207 1208 1209 1210
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
    nodesDestroyNode(pDispatcher);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pSink = (SDataSinkNode*)pDispatcher;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1211 1212
}

X
Xiaoyu Wang 已提交
1213
static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
X
Xiaoyu Wang 已提交
1214
  SSubplan* pSubplan = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
X
Xiaoyu Wang 已提交
1215 1216 1217
  if (NULL == pSubplan) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1218
  pSubplan->id = pLogicSubplan->id;
X
Xiaoyu Wang 已提交
1219 1220
  pSubplan->subplanType = pLogicSubplan->subplanType;
  pSubplan->level = pLogicSubplan->level;
X
Xiaoyu Wang 已提交
1221 1222 1223
  return pSubplan;
}

X
Xiaoyu Wang 已提交
1224
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
X
Xiaoyu Wang 已提交
1225
  SSubplan* pSubplan = makeSubplan(pCxt, pLogicSubplan);
X
Xiaoyu Wang 已提交
1226 1227 1228 1229 1230 1231
  if (NULL == pSubplan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

1232 1233 1234
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
    SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode;
    pSubplan->msgType = pModif->msgType;
X
Xiaoyu Wang 已提交
1235
    pSubplan->execNode.epSet = pModif->pVgDataBlocks->vg.epSet;
1236
    taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
X
Xiaoyu Wang 已提交
1237
    code = createDataInserter(pCxt, pModif->pVgDataBlocks, &pSubplan->pDataSink);
1238
  } else {
1239
    pSubplan->msgType = TDMT_VND_QUERY;
X
Xiaoyu Wang 已提交
1240 1241 1242 1243
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
    }
1244
  }
X
Xiaoyu Wang 已提交
1245

X
Xiaoyu Wang 已提交
1246 1247 1248 1249
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiSubplan = pSubplan;
  } else {
    nodesDestroyNode(pSubplan);
X
Xiaoyu Wang 已提交
1250
  }
X
Xiaoyu Wang 已提交
1251

X
Xiaoyu Wang 已提交
1252
  return code;
X
Xiaoyu Wang 已提交
1253 1254
}

X
Xiaoyu Wang 已提交
1255 1256 1257 1258 1259 1260 1261 1262 1263
static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
  SQueryPlan* pPlan = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
  if (NULL == pPlan) {
    return NULL;
  }
  pPlan->pSubplans = nodesMakeList();
  if (NULL == pPlan->pSubplans) {
    nodesDestroyNode(pPlan);
    return NULL;
1264
  }
X
Xiaoyu Wang 已提交
1265 1266
  pPlan->queryId = pCxt->pPlanCxt->queryId;
  return pPlan;
1267 1268 1269 1270 1271 1272
}

static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t level, SNodeList* pSubplans) {
  SNodeListNode* pGroup;
  if (level >= LIST_LENGTH(pSubplans)) {
    pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST);
X
bugfix  
Xiaoyu Wang 已提交
1273 1274 1275 1276 1277 1278
    if (NULL == pGroup) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, pGroup)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1279 1280 1281 1282 1283
  } else {
    pGroup = nodesListGetNode(pSubplans, level);
  }
  if (NULL == pGroup->pNodeList) {
    pGroup->pNodeList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
1284 1285 1286
    if (NULL == pGroup->pNodeList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1287
  }
X
bugfix  
Xiaoyu Wang 已提交
1288
  return nodesListStrictAppend(pGroup->pNodeList, pSubplan);
1289 1290
}

X
Xiaoyu Wang 已提交
1291 1292
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent,
                              SQueryPlan* pQueryPlan) {
X
Xiaoyu Wang 已提交
1293
  SSubplan* pSubplan = NULL;
X
Xiaoyu Wang 已提交
1294
  int32_t   code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
X
Xiaoyu Wang 已提交
1295

X
Xiaoyu Wang 已提交
1296 1297 1298
  if (TSDB_CODE_SUCCESS == code) {
    code = pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
    ++(pQueryPlan->numOfSubplans);
1299 1300
  }

1301 1302 1303 1304 1305
  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyNode(pSubplan);
    return code;
  }

X
Xiaoyu Wang 已提交
1306 1307 1308 1309
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
    code = nodesListMakeAppend(&pParent->pChildren, pSubplan);
    if (TSDB_CODE_SUCCESS == code) {
      code = nodesListMakeAppend(&pSubplan->pParents, pParent);
X
Xiaoyu Wang 已提交
1310 1311
    }
  }
X
Xiaoyu Wang 已提交
1312

X
Xiaoyu Wang 已提交
1313 1314 1315 1316 1317 1318
  if (TSDB_CODE_SUCCESS == code) {
    SNode* pChild = NULL;
    FOREACH(pChild, pLogicSubplan->pChildren) {
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
      if (TSDB_CODE_SUCCESS != code) {
        break;
X
Xiaoyu Wang 已提交
1319 1320 1321
      }
    }
  }
X
Xiaoyu Wang 已提交
1322

X
Xiaoyu Wang 已提交
1323
  return code;
1324 1325
}

X
Xiaoyu Wang 已提交
1326 1327 1328 1329
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
  SQueryPlan* pPlan = makeQueryPhysiPlan(pCxt);
  if (NULL == pPlan) {
    return TSDB_CODE_OUT_OF_MEMORY;
1330
  }
X
Xiaoyu Wang 已提交
1331

X
Xiaoyu Wang 已提交
1332
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1333

X
Xiaoyu Wang 已提交
1334 1335 1336 1337 1338 1339
  SNode* pSubplan = NULL;
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
1340 1341
  }

X
Xiaoyu Wang 已提交
1342 1343 1344 1345
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiPlan = pPlan;
  } else {
    nodesDestroyNode(pPlan);
X
Xiaoyu Wang 已提交
1346 1347
  }

X
Xiaoyu Wang 已提交
1348
  return code;
X
Xiaoyu Wang 已提交
1349
}
X
Xiaoyu Wang 已提交
1350

X
Xiaoyu Wang 已提交
1351
static void destoryLocationHash(void* p) {
X
Xiaoyu Wang 已提交
1352
  SHashObj*   pHash = *(SHashObj**)p;
X
bugfix  
Xiaoyu Wang 已提交
1353 1354 1355 1356 1357
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
  while (NULL != pIndex) {
    taosArrayDestroy(pIndex->pSlotIdsInfo);
    pIndex = taosHashIterate(pHash, pIndex);
  }
X
Xiaoyu Wang 已提交
1358 1359 1360 1361 1362 1363 1364
  taosHashCleanup(pHash);
}

static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
}

1365 1366 1367 1368 1369
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
1370
    pPlan->explainInfo.ratio = pStmt->pOptions->ratio;
1371 1372 1373 1374 1375
  } else {
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
  }
}

X
Xiaoyu Wang 已提交
1376
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
X
Xiaoyu Wang 已提交
1377 1378 1379 1380 1381
  SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
                           .errCode = TSDB_CODE_SUCCESS,
                           .nextDataBlockId = 0,
                           .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
                           .pExecNodeList = pExecNodeList};
X
Xiaoyu Wang 已提交
1382 1383 1384
  if (NULL == cxt.pLocationHelper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1385

1386 1387
  if (QUERY_POLICY_VNODE == tsQueryPolicy) {
    taosArrayClear(pExecNodeList);
X
Xiaoyu Wang 已提交
1388
  }
1389 1390

  int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
1391 1392 1393 1394
  if (TSDB_CODE_SUCCESS == code) {
    setExplainInfo(pCxt, *pPlan);
  }

X
Xiaoyu Wang 已提交
1395 1396
  destoryPhysiPlanContext(&cxt);
  return code;
X
Xiaoyu Wang 已提交
1397
}