planPhysiCreater.c 61.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "planInt.h"
X
Xiaoyu Wang 已提交
17

X
Xiaoyu Wang 已提交
18
#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "functionMgt.h"
20
#include "systable.h"
X
Xiaoyu Wang 已提交
21
#include "tglobal.h"
X
Xiaoyu Wang 已提交
22

X
bugfix  
Xiaoyu Wang 已提交
23 24
typedef struct SSlotIdInfo {
  int16_t slotId;
X
Xiaoyu Wang 已提交
25
  bool    set;
X
bugfix  
Xiaoyu Wang 已提交
26 27
} SSlotIdInfo;

X
Xiaoyu Wang 已提交
28 29
typedef struct SSlotIndex {
  int16_t dataBlockId;
X
Xiaoyu Wang 已提交
30
  SArray* pSlotIdsInfo;  // duplicate name slot
X
Xiaoyu Wang 已提交
31 32 33
} SSlotIndex;

typedef struct SPhysiPlanContext {
X
Xiaoyu Wang 已提交
34
  SPlanContext* pPlanCxt;
X
Xiaoyu Wang 已提交
35 36 37
  int32_t       errCode;
  int16_t       nextDataBlockId;
  SArray*       pLocationHelper;
38
  SArray*       pExecNodeList;  // SArray<SQueryNodeLoad>
X
Xiaoyu Wang 已提交
39 40
} SPhysiPlanContext;

X
Xiaoyu Wang 已提交
41
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
X
Xiaoyu Wang 已提交
42 43
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
    SColumnNode* pCol = (SColumnNode*)pNode;
44 45 46 47 48 49
    if (NULL != pStmtName) {
      if ('\0' != pStmtName[0]) {
        return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
      } else {
        return sprintf(pKey, "%s", pCol->node.aliasName);
      }
X
Xiaoyu Wang 已提交
50
    }
X
Xiaoyu Wang 已提交
51 52 53 54 55
    if ('\0' == pCol->tableAlias[0]) {
      return sprintf(pKey, "%s", pCol->colName);
    }
    return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
  }
X
Xiaoyu Wang 已提交
56

X
Xiaoyu Wang 已提交
57
  if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
58 59
    return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName);
  }
X
Xiaoyu Wang 已提交
60 61 62
  return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}

63 64
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const char* pName, const SNode* pNode, int16_t slotId,
                             bool output, bool reserve) {
X
Xiaoyu Wang 已提交
65
  SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
X
bugfix  
Xiaoyu Wang 已提交
66 67 68
  if (NULL == pSlot) {
    return NULL;
  }
69
  strcpy(pSlot->name, pName);
X
Xiaoyu Wang 已提交
70 71
  pSlot->slotId = slotId;
  pSlot->dataType = ((SExprNode*)pNode)->resType;
X
Xiaoyu Wang 已提交
72
  pSlot->reserve = reserve;
X
bugfix  
Xiaoyu Wang 已提交
73
  pSlot->output = output;
X
Xiaoyu Wang 已提交
74 75 76
  return (SNode*)pSlot;
}

X
bugfix  
Xiaoyu Wang 已提交
77
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
X
Xiaoyu Wang 已提交
78 79
  STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
  if (NULL == pTarget) {
X
bugfix  
Xiaoyu Wang 已提交
80
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
81
  }
X
bugfix  
Xiaoyu Wang 已提交
82

X
Xiaoyu Wang 已提交
83 84 85
  pTarget->dataBlockId = dataBlockId;
  pTarget->slotId = slotId;
  pTarget->pExpr = pNode;
X
bugfix  
Xiaoyu Wang 已提交
86 87 88

  *pOutput = (SNode*)pTarget;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
89 90
}

X
bugfix  
Xiaoyu Wang 已提交
91
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
92 93
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
  if (NULL != pIndex) {
X
Xiaoyu Wang 已提交
94
    SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
95 96 97 98
    taosArrayPush(pIndex->pSlotIdsInfo, &info);
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
99
  SSlotIndex index = {.dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo))};
X
bugfix  
Xiaoyu Wang 已提交
100 101 102
  if (NULL == index.pSlotIdsInfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
103
  SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
104
  taosArrayPush(index.pSlotIdsInfo, &info);
X
bugfix  
Xiaoyu Wang 已提交
105 106 107
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}

108 109
static int32_t putSlotToHash(const char* pName, int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
  return putSlotToHashImpl(dataBlockId, slotId, pName, strlen(pName), pHash);
X
bugfix  
Xiaoyu Wang 已提交
110 111
}

X
Xiaoyu Wang 已提交
112 113
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
                                       SHashObj** pDescHash) {
X
bugfix  
Xiaoyu Wang 已提交
114
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
bugfix  
Xiaoyu Wang 已提交
115 116 117 118 119 120 121 122 123 124 125 126
  if (NULL == pHash) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
    taosHashCleanup(pHash);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pDescHash = pHash;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
127 128
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
                                   SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
129
  pDataBlockDesc->pSlots = nodesMakeList();
X
Xiaoyu Wang 已提交
130
  if (NULL == pDataBlockDesc->pSlots) {
X
bugfix  
Xiaoyu Wang 已提交
131 132
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
133

X
bugfix  
Xiaoyu Wang 已提交
134 135
  int32_t code = TSDB_CODE_SUCCESS;
  int16_t slotId = 0;
X
Xiaoyu Wang 已提交
136
  SNode*  pNode = NULL;
X
bugfix  
Xiaoyu Wang 已提交
137
  FOREACH(pNode, pList) {
138 139 140
    char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    getSlotKey(pNode, NULL, name);
    code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pNode, slotId, true, false));
X
bugfix  
Xiaoyu Wang 已提交
141
    if (TSDB_CODE_SUCCESS == code) {
142
      code = putSlotToHash(name, pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
X
bugfix  
Xiaoyu Wang 已提交
143 144
    }
    if (TSDB_CODE_SUCCESS == code) {
145 146
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
X
bugfix  
Xiaoyu Wang 已提交
147 148 149
      ++slotId;
    } else {
      break;
X
Xiaoyu Wang 已提交
150
    }
X
bugfix  
Xiaoyu Wang 已提交
151 152 153 154 155
  }
  return code;
}

static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
156
  SDataBlockDescNode* pDesc = (SDataBlockDescNode*)nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
X
bugfix  
Xiaoyu Wang 已提交
157 158 159 160 161 162
  if (NULL == pDesc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pDesc->dataBlockId = pCxt->nextDataBlockId++;

  SHashObj* pHash = NULL;
X
Xiaoyu Wang 已提交
163
  int32_t   code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash);
X
bugfix  
Xiaoyu Wang 已提交
164 165 166 167 168 169
  if (TSDB_CODE_SUCCESS == code) {
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pDataBlockDesc = pDesc;
X
Xiaoyu Wang 已提交
170
  } else {
171
    nodesDestroyNode((SNode*)pDesc);
X
Xiaoyu Wang 已提交
172
  }
X
bugfix  
Xiaoyu Wang 已提交
173 174 175 176

  return code;
}

X
bugfix  
Xiaoyu Wang 已提交
177 178 179 180 181 182 183 184 185 186 187 188
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
  for (int32_t i = 0; i < size; ++i) {
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
    if (!pInfo->set) {
      pInfo->set = true;
      return pInfo->slotId;
    }
  }
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
}

X
Xiaoyu Wang 已提交
189
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
X
Xiaoyu Wang 已提交
190
                                     const char* pStmtName, bool output, bool reserve) {
191 192 193 194
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
195
  int32_t   code = TSDB_CODE_SUCCESS;
X
bugfix  
Xiaoyu Wang 已提交
196
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
X
Xiaoyu Wang 已提交
197 198
  int16_t   nextSlotId = taosHashGetSize(pHash), slotId = 0;
  SNode*    pNode = NULL;
X
Xiaoyu Wang 已提交
199
  FOREACH(pNode, pList) {
X
Xiaoyu Wang 已提交
200 201 202
    SNode*      pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
    char        name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    int32_t     len = getSlotKey(pExpr, pStmtName, name);
X
bugfix  
Xiaoyu Wang 已提交
203 204
    SSlotIndex* pIndex = taosHashGet(pHash, name, len);
    if (NULL == pIndex) {
205 206
      code =
          nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pExpr, nextSlotId, output, reserve));
X
bugfix  
Xiaoyu Wang 已提交
207 208 209
      if (TSDB_CODE_SUCCESS == code) {
        code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
      }
210 211 212 213
      pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
      if (output) {
        pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
      }
X
bugfix  
Xiaoyu Wang 已提交
214 215 216
      slotId = nextSlotId;
      ++nextSlotId;
    } else {
X
bugfix  
Xiaoyu Wang 已提交
217
      slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
X
bugfix  
Xiaoyu Wang 已提交
218
    }
219

X
bugfix  
Xiaoyu Wang 已提交
220 221 222 223 224 225 226
    if (TSDB_CODE_SUCCESS == code) {
      SNode* pTarget = NULL;
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pTarget);
      }
    }
X
Xiaoyu Wang 已提交
227

X
bugfix  
Xiaoyu Wang 已提交
228 229 230
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
231
  }
X
bugfix  
Xiaoyu Wang 已提交
232 233 234 235
  return code;
}

static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
236
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, false);
X
Xiaoyu Wang 已提交
237 238
}

239 240 241 242 243 244
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
  if (NULL == pNode || NULL == *pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
245
  int32_t    code = nodesListMakeAppend(&pList, *pNode);
246 247 248 249 250 251 252 253 254 255
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pNode = nodesListGetNode(pList, 0);
  }
  nodesClearList(pList);
  return code;
}

X
Xiaoyu Wang 已提交
256 257
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
                                           SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
258
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true, false);
X
bugfix  
Xiaoyu Wang 已提交
259 260 261
}

static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
262
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true, true);
X
Xiaoyu Wang 已提交
263 264 265
}

typedef struct SSetSlotIdCxt {
X
Xiaoyu Wang 已提交
266
  int32_t   errCode;
X
Xiaoyu Wang 已提交
267 268 269 270
  SHashObj* pLeftHash;
  SHashObj* pRightHash;
} SSetSlotIdCxt;

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
static void dumpSlots(const char* pName, SHashObj* pHash) {
  if (NULL == pHash) {
    return;
  }
  planDebug("%s", pName);
  void* pIt = taosHashIterate(pHash, NULL);
  while (NULL != pIt) {
    size_t len = 0;
    char*  pKey = taosHashGetKey(pIt, &len);
    char   name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    strncpy(name, pKey, len);
    planDebug("\tslot name = %s", name);
    pIt = taosHashIterate(pHash, pIt);
  }
}

X
Xiaoyu Wang 已提交
287 288 289
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
X
Xiaoyu Wang 已提交
290 291 292
    char           name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    int32_t        len = getSlotKey(pNode, NULL, name);
    SSlotIndex*    pIndex = taosHashGet(pCxt->pLeftHash, name, len);
X
Xiaoyu Wang 已提交
293 294 295 296
    if (NULL == pIndex) {
      pIndex = taosHashGet(pCxt->pRightHash, name, len);
    }
    // pIndex is definitely not NULL, otherwise it is a bug
X
bugfix  
Xiaoyu Wang 已提交
297
    if (NULL == pIndex) {
298
      planError("doSetSlotId failed, invalid slot name %s", name);
299 300
      dumpSlots("left datablock desc", pCxt->pLeftHash);
      dumpSlots("right datablock desc", pCxt->pRightHash);
301
      pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
X
bugfix  
Xiaoyu Wang 已提交
302 303
      return DEAL_RES_ERROR;
    }
X
Xiaoyu Wang 已提交
304
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
X
bugfix  
Xiaoyu Wang 已提交
305
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
X
Xiaoyu Wang 已提交
306 307 308 309 310
    return DEAL_RES_IGNORE_CHILD;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
311 312
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
                             SNode** pOutput) {
X
Xiaoyu Wang 已提交
313
  SNode* pRes = nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
314 315 316 317 318
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
319 320 321
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
322
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
323 324
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyNode(pRes);
X
Xiaoyu Wang 已提交
325
    return cxt.errCode;
X
Xiaoyu Wang 已提交
326
  }
X
Xiaoyu Wang 已提交
327 328 329

  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
330 331
}

X
Xiaoyu Wang 已提交
332 333
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
                             const SNodeList* pList, SNodeList** pOutput) {
X
Xiaoyu Wang 已提交
334
  SNodeList* pRes = nodesCloneList(pList);
X
Xiaoyu Wang 已提交
335 336 337 338 339
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
340 341 342
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
343
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
344 345
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyList(pRes);
X
Xiaoyu Wang 已提交
346
    return cxt.errCode;
X
Xiaoyu Wang 已提交
347
  }
X
Xiaoyu Wang 已提交
348 349
  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
350 351
}

X
Xiaoyu Wang 已提交
352
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
X
Xiaoyu Wang 已提交
353
  SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
X
Xiaoyu Wang 已提交
354 355 356
  if (NULL == pPhysiNode) {
    return NULL;
  }
X
bugfix  
Xiaoyu Wang 已提交
357

358 359
  TSWAP(pPhysiNode->pLimit, pLogicNode->pLimit);
  TSWAP(pPhysiNode->pSlimit, pLogicNode->pSlimit);
360

X
bugfix  
Xiaoyu Wang 已提交
361 362
  int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
  if (TSDB_CODE_SUCCESS != code) {
363
    nodesDestroyNode((SNode*)pPhysiNode);
X
Xiaoyu Wang 已提交
364 365
    return NULL;
  }
X
Xiaoyu Wang 已提交
366
  pPhysiNode->pOutputDataBlockDesc->precision = pLogicNode->precision;
X
Xiaoyu Wang 已提交
367 368 369 370 371
  return pPhysiNode;
}

static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
  if (NULL != pLogicNode->pConditions) {
X
Xiaoyu Wang 已提交
372 373
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions,
                         &pPhysiNode->pConditions);
X
Xiaoyu Wang 已提交
374 375 376 377
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
}

static int32_t sortScanCols(SNodeList* pScanCols) {
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
  if (NULL == pArray) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNode* pCol = NULL;
X
Xiaoyu Wang 已提交
391
  FOREACH(pCol, pScanCols) { taosArrayPush(pArray, &pCol); }
X
Xiaoyu Wang 已提交
392 393 394
  taosArraySort(pArray, colIdCompare);

  int32_t index = 0;
X
Xiaoyu Wang 已提交
395
  FOREACH(pCol, pScanCols) { REPLACE_NODE(taosArrayGetP(pArray, index++)); }
X
Xiaoyu Wang 已提交
396 397 398 399 400
  taosArrayDestroy(pArray);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
401
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
402 403 404 405
  if (NULL == pScanCols) {
    return TSDB_CODE_SUCCESS;
  }

406 407 408
  pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
  if (NULL == pScanPhysiNode->pScanCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
409
  }
X
bugfix  
Xiaoyu Wang 已提交
410
  return sortScanCols(pScanPhysiNode->pScanCols);
411 412
}

X
Xiaoyu Wang 已提交
413
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
X
Xiaoyu Wang 已提交
414
                                           SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
415 416 417
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
  if (TSDB_CODE_SUCCESS == code) {
    // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
X
bugfix  
Xiaoyu Wang 已提交
418
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
419
  }
420 421 422 423 424 425 426 427 428 429 430 431

  if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
    pScanPhysiNode->pScanPseudoCols = nodesCloneList(pScanLogicNode->pScanPseudoCols);
    if (NULL == pScanPhysiNode->pScanPseudoCols) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
  }

X
Xiaoyu Wang 已提交
432 433 434
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
  }
435

X
Xiaoyu Wang 已提交
436
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
437
    pScanPhysiNode->uid = pScanLogicNode->tableId;
X
Xiaoyu Wang 已提交
438
    pScanPhysiNode->suid = pScanLogicNode->stableId;
X
Xiaoyu Wang 已提交
439
    pScanPhysiNode->tableType = pScanLogicNode->tableType;
X
Xiaoyu Wang 已提交
440
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
X
Xiaoyu Wang 已提交
441 442 443 444 445 446
    if (NULL != pScanLogicNode->pTagCond) {
      pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond);
      if (NULL == pSubplan->pTagCond) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
X
Xiaoyu Wang 已提交
447
  }
X
Xiaoyu Wang 已提交
448

X
Xiaoyu Wang 已提交
449 450 451 452 453 454 455 456 457
  if (TSDB_CODE_SUCCESS == code) {
    if (NULL != pScanLogicNode->pTagIndexCond) {
      pSubplan->pTagIndexCond = nodesCloneNode(pScanLogicNode->pTagIndexCond);
      if (NULL == pSubplan->pTagIndexCond) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  }

X
Xiaoyu Wang 已提交
458 459 460
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
  } else {
461
    nodesDestroyNode((SNode*)pScanPhysiNode);
X
Xiaoyu Wang 已提交
462
  }
X
Xiaoyu Wang 已提交
463

X
Xiaoyu Wang 已提交
464
  return code;
X
Xiaoyu Wang 已提交
465 466
}

467 468
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
469
  pNodeAddr->epSet = vg->epSet;
470 471
}

X
Xiaoyu Wang 已提交
472 473
static ENodeType getScanOperatorType(EScanType scanType) {
  switch (scanType) {
474 475
    case SCAN_TYPE_TAG:
      return QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
X
Xiaoyu Wang 已提交
476 477 478 479 480
    case SCAN_TYPE_TABLE:
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
    case SCAN_TYPE_STREAM:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
    case SCAN_TYPE_TABLE_MERGE:
S
slzhou 已提交
481
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
482 483
    case SCAN_TYPE_BLOCK_INFO:
      return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
X
Xiaoyu Wang 已提交
484 485
    case SCAN_TYPE_LAST_ROW:
      return QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
X
Xiaoyu Wang 已提交
486 487 488 489 490 491
    default:
      break;
  }
  return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
}

492 493 494 495 496 497 498 499 500 501 502 503 504
static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                         SPhysiNode** pPhyNode) {
  SScanPhysiNode* pScan =
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, getScanOperatorType(pScanLogicNode->scanType));
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
  SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
  taosArrayPush(pCxt->pExecNodeList, &node);
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
}

X
Xiaoyu Wang 已提交
505 506
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                        SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
507 508
  STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
                                                                        getScanOperatorType(pScanLogicNode->scanType));
X
Xiaoyu Wang 已提交
509 510 511 512
  if (NULL == pTableScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
513
  memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
X
Xiaoyu Wang 已提交
514
  pTableScan->scanRange = pScanLogicNode->scanRange;
515
  pTableScan->ratio = pScanLogicNode->ratio;
5
54liuyao 已提交
516 517 518 519
  if (pScanLogicNode->pVgroupList) {
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
  }
D
dapan1121 已提交
520
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
521 522
  pTableScan->dataRequired = pScanLogicNode->dataRequired;
  pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
523
  pTableScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
X
Xiaoyu Wang 已提交
524
  if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
525
      (NULL != pScanLogicNode->pGroupTags && NULL == pTableScan->pGroupTags)) {
526
    nodesDestroyNode((SNode*)pTableScan);
527 528
    return TSDB_CODE_OUT_OF_MEMORY;
  }
529
  pTableScan->groupSort = pScanLogicNode->groupSort;
X
Xiaoyu Wang 已提交
530 531 532 533 534
  pTableScan->interval = pScanLogicNode->interval;
  pTableScan->offset = pScanLogicNode->offset;
  pTableScan->sliding = pScanLogicNode->sliding;
  pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
  pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
5
54liuyao 已提交
535 536
  pTableScan->triggerType = pScanLogicNode->triggerType;
  pTableScan->watermark = pScanLogicNode->watermark;
537
  pTableScan->igExpired = pScanLogicNode->igExpired;
X
Xiaoyu Wang 已提交
538

X
Xiaoyu Wang 已提交
539
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
X
Xiaoyu Wang 已提交
540 541
}

X
Xiaoyu Wang 已提交
542 543
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                              SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
544 545
  SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
                                                                               QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
X
Xiaoyu Wang 已提交
546 547 548 549
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
550 551
  pScan->showRewrite = pScanLogicNode->showRewrite;
  pScan->accountId = pCxt->pPlanCxt->acctId;
552 553
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
X
Xiaoyu Wang 已提交
554
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
X
Xiaoyu Wang 已提交
555
  }
D
dapan1121 已提交
556 557
  SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
  taosArrayPush(pCxt->pExecNodeList, &node);
558 559 560 561 562
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES)) {
    pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet;
  } else {
    pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
  }
D
dapan1121 已提交
563
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
X
Xiaoyu Wang 已提交
564

X
Xiaoyu Wang 已提交
565
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
X
Xiaoyu Wang 已提交
566 567
}

X
Xiaoyu Wang 已提交
568 569
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                         SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
570 571 572 573 574 575
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
}

static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
576 577
}

X
Xiaoyu Wang 已提交
578 579
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
580 581
  switch (pScanLogicNode->scanType) {
    case SCAN_TYPE_TAG:
582
    case SCAN_TYPE_BLOCK_INFO:
X
Xiaoyu Wang 已提交
583
    case SCAN_TYPE_LAST_ROW:
584
      return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
585
    case SCAN_TYPE_TABLE:
X
Xiaoyu Wang 已提交
586
      return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
587
    case SCAN_TYPE_SYSTEM_TABLE:
X
Xiaoyu Wang 已提交
588
      return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
589
    case SCAN_TYPE_STREAM:
X
Xiaoyu Wang 已提交
590
      return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
591 592
    case SCAN_TYPE_TABLE_MERGE:
      return createTableMergeScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
593 594 595
    default:
      break;
  }
X
Xiaoyu Wang 已提交
596
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
597 598
}

X
Xiaoyu Wang 已提交
599 600
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
601
  SJoinPhysiNode* pJoin =
602
      (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
X
Xiaoyu Wang 已提交
603 604 605
  if (NULL == pJoin) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
606

607 608
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
X
Xiaoyu Wang 已提交
609
  int32_t             code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
610

611 612
  pJoin->joinType = pJoinLogicNode->joinType;
  if (NULL != pJoinLogicNode->pOnConditions) {
X
Xiaoyu Wang 已提交
613 614
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions,
                         &pJoin->pOnConditions);
615
  }
X
Xiaoyu Wang 已提交
616
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
617 618
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
                         &pJoin->pTargets);
X
Xiaoyu Wang 已提交
619 620
  }
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
621
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
622 623 624 625
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
  }
X
Xiaoyu Wang 已提交
626

X
Xiaoyu Wang 已提交
627 628 629
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pJoin;
  } else {
630
    nodesDestroyNode((SNode*)pJoin);
X
Xiaoyu Wang 已提交
631
  }
X
Xiaoyu Wang 已提交
632

X
Xiaoyu Wang 已提交
633
  return code;
X
Xiaoyu Wang 已提交
634 635 636
}

typedef struct SRewritePrecalcExprsCxt {
X
Xiaoyu Wang 已提交
637 638 639
  int32_t    errCode;
  int32_t    planNodeId;
  int32_t    rewriteId;
X
Xiaoyu Wang 已提交
640 641 642 643 644
  SNodeList* pPrecalcExprs;
} SRewritePrecalcExprsCxt;

static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SNode* pExpr = nodesCloneNode(*pNode);
X
bugfix  
Xiaoyu Wang 已提交
645
  if (NULL == pExpr) {
646
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
bugfix  
Xiaoyu Wang 已提交
647 648
    return DEAL_RES_ERROR;
  }
X
Xiaoyu Wang 已提交
649
  if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
650
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
651 652 653 654 655
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
  if (NULL == pCol) {
656
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
657 658 659 660 661 662 663 664
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
  pCol->node.resType = pRewrittenExpr->resType;
  if ('\0' != pRewrittenExpr->aliasName[0]) {
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  } else {
X
Xiaoyu Wang 已提交
665 666
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId,
             pCxt->rewriteId);
X
Xiaoyu Wang 已提交
667 668 669 670 671 672 673
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  }
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)pCol;
  return DEAL_RES_IGNORE_CHILD;
}

674 675 676 677 678
static int32_t rewriteValueToOperator(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SOperatorNode* pOper = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
  if (NULL == pOper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
679 680
  pOper->pLeft = nodesMakeNode(QUERY_NODE_LEFT_VALUE);
  if (NULL == pOper->pLeft) {
681
    nodesDestroyNode((SNode*)pOper);
682 683
    return TSDB_CODE_OUT_OF_MEMORY;
  }
684 685 686 687
  SValueNode* pVal = (SValueNode*)*pNode;
  pOper->node.resType = pVal->node.resType;
  strcpy(pOper->node.aliasName, pVal->node.aliasName);
  pOper->opType = OP_TYPE_ASSIGN;
688
  pOper->pRight = *pNode;
689 690 691 692
  *pNode = (SNode*)pOper;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
693 694 695
static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
  switch (nodeType(*pNode)) {
696
    case QUERY_NODE_VALUE: {
697 698 699
      if (((SValueNode*)*pNode)->notReserved) {
        break;
      }
700 701
      pCxt->errCode = rewriteValueToOperator(pCxt, pNode);
      if (TSDB_CODE_SUCCESS != pCxt->errCode) {
702 703 704 705
        return DEAL_RES_ERROR;
      }
      return collectAndRewrite(pCxt, pNode);
    }
X
Xiaoyu Wang 已提交
706 707
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
708
      return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
709 710
    }
    case QUERY_NODE_FUNCTION: {
711
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
712
        return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
713 714 715 716 717 718 719 720
      }
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
721 722
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs,
                                   SNodeList** pRewrittenList) {
X
Xiaoyu Wang 已提交
723 724 725 726 727 728
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == *pPrecalcExprs) {
    *pPrecalcExprs = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
729 730 731
    if (NULL == *pPrecalcExprs) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
732 733 734
  }
  if (NULL == *pRewrittenList) {
    *pRewrittenList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
735 736 737
    if (NULL == *pRewrittenList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
738 739 740 741 742 743 744 745 746
  }
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
    SNode* pNew = NULL;
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
      pNew = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0));
    } else {
      pNew = nodesCloneNode(pNode);
    }
X
bugfix  
Xiaoyu Wang 已提交
747 748 749 750 751 752
    if (NULL == pNew) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
753
  }
X
Xiaoyu Wang 已提交
754
  SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs};
X
Xiaoyu Wang 已提交
755
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
756
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs) || TSDB_CODE_SUCCESS != cxt.errCode) {
X
Xiaoyu Wang 已提交
757
    NODES_DESTORY_LIST(*pPrecalcExprs);
X
Xiaoyu Wang 已提交
758 759 760 761
  }
  return cxt.errCode;
}

X
Xiaoyu Wang 已提交
762 763
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs,
                                  SNode** pRewritten) {
764 765 766 767 768
  if (NULL == pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
769
  int32_t    code = nodesListMakeAppend(&pList, pNode);
770 771 772 773 774 775 776 777 778 779 780 781
  SNodeList* pRewrittenList = NULL;
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
  }
  nodesClearList(pList);
  nodesClearList(pRewrittenList);
  return code;
}

X
Xiaoyu Wang 已提交
782 783
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
                                  SPhysiNode** pPhyNode) {
784 785
  SAggPhysiNode* pAgg =
      (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
X
Xiaoyu Wang 已提交
786 787 788
  if (NULL == pAgg) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
789 790 791 792

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pGroupKeys = NULL;
  SNodeList* pAggFuncs = NULL;
X
Xiaoyu Wang 已提交
793
  int32_t    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
X
Xiaoyu Wang 已提交
794 795 796
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
  }
X
Xiaoyu Wang 已提交
797

X
Xiaoyu Wang 已提交
798 799
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
X
Xiaoyu Wang 已提交
800 801 802
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
803
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
804
    }
X
Xiaoyu Wang 已提交
805 806
  }

X
Xiaoyu Wang 已提交
807 808 809
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
810
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
811
    }
X
Xiaoyu Wang 已提交
812 813
  }

X
Xiaoyu Wang 已提交
814 815 816
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
817
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
818
    }
X
Xiaoyu Wang 已提交
819 820
  }

X
Xiaoyu Wang 已提交
821 822 823
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
  }
X
Xiaoyu Wang 已提交
824

X
Xiaoyu Wang 已提交
825 826 827
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pAgg;
  } else {
828
    nodesDestroyNode((SNode*)pAgg);
X
Xiaoyu Wang 已提交
829
  }
X
Xiaoyu Wang 已提交
830

X
bugfix  
Xiaoyu Wang 已提交
831 832 833 834
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pGroupKeys);
  nodesDestroyList(pAggFuncs);

X
Xiaoyu Wang 已提交
835
  return code;
X
Xiaoyu Wang 已提交
836 837
}

838 839 840
static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SIndefRowsFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
  SIndefRowsFuncPhysiNode* pIdfRowsFunc = (SIndefRowsFuncPhysiNode*)makePhysiNode(
X
Xiaoyu Wang 已提交
841
      pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC);
842 843 844 845 846
  if (NULL == pIdfRowsFunc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
X
Xiaoyu Wang 已提交
847 848
  SNodeList* pFuncs = NULL;
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
849 850 851 852 853 854 855 856 857 858

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pIdfRowsFunc->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = pushdownDataBlockSlots(pCxt, pIdfRowsFunc->pExprs, pChildTupe);
    }
  }

X
Xiaoyu Wang 已提交
859 860
  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pIdfRowsFunc->pFuncs);
861
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
862
      code = addDataBlockSlots(pCxt, pIdfRowsFunc->pFuncs, pIdfRowsFunc->node.pOutputDataBlockDesc);
863 864 865 866 867 868
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pIdfRowsFunc;
  } else {
869
    nodesDestroyNode((SNode*)pIdfRowsFunc);
870 871
  }

X
Xiaoyu Wang 已提交
872 873 874
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pFuncs);

875 876 877
  return code;
}

X
Xiaoyu Wang 已提交
878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                         SInterpFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
  SInterpFuncPhysiNode* pInterpFunc =
      (SInterpFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC);
  if (NULL == pInterpFunc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pFuncs = NULL;
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pInterpFunc->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = pushdownDataBlockSlots(pCxt, pInterpFunc->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pInterpFunc->pFuncs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pInterpFunc->pFuncs, pInterpFunc->node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    pInterpFunc->timeRange = pFuncLogicNode->timeRange;
    pInterpFunc->interval = pFuncLogicNode->interval;
X
Xiaoyu Wang 已提交
909 910
    pInterpFunc->fillMode = pFuncLogicNode->fillMode;
    pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues);
X
Xiaoyu Wang 已提交
911
    if (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues) {
X
Xiaoyu Wang 已提交
912 913
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
914 915
  }

X
Xiaoyu Wang 已提交
916 917 918 919
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncLogicNode->pTimeSeries, &pInterpFunc->pTimeSeries);
  }

X
Xiaoyu Wang 已提交
920 921 922 923 924 925
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pInterpFunc;
  } else {
    nodesDestroyNode((SNode*)pInterpFunc);
  }

X
Xiaoyu Wang 已提交
926 927 928
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pFuncs);

X
Xiaoyu Wang 已提交
929 930 931
  return code;
}

X
Xiaoyu Wang 已提交
932 933
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                      SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
934 935
  SProjectPhysiNode* pProject =
      (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
X
Xiaoyu Wang 已提交
936 937 938
  if (NULL == pProject) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
939

940 941 942 943 944 945 946 947 948 949
  int32_t code = TSDB_CODE_SUCCESS;
  if (0 == LIST_LENGTH(pChildren)) {
    pProject->pProjections = nodesCloneList(pProjectLogicNode->pProjections);
    if (NULL == pProject->pProjections) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  } else {
    code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1,
                         pProjectLogicNode->pProjections, &pProject->pProjections);
  }
X
Xiaoyu Wang 已提交
950
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
951 952
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections,
                                       pProject->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
953 954 955 956
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
  }
X
Xiaoyu Wang 已提交
957

X
Xiaoyu Wang 已提交
958 959 960
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pProject;
  } else {
961
    nodesDestroyNode((SNode*)pProject);
X
Xiaoyu Wang 已提交
962
  }
X
Xiaoyu Wang 已提交
963

X
Xiaoyu Wang 已提交
964
  return code;
X
Xiaoyu Wang 已提交
965 966
}

X
Xiaoyu Wang 已提交
967 968
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                         SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
969 970
  SExchangePhysiNode* pExchange =
      (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
971 972 973
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
974

X
Xiaoyu Wang 已提交
975
  pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
X
bugfix  
Xiaoyu Wang 已提交
976
  *pPhyNode = (SPhysiNode*)pExchange;
X
Xiaoyu Wang 已提交
977

X
bugfix  
Xiaoyu Wang 已提交
978
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
979
}
X
Xiaoyu Wang 已提交
980

X
Xiaoyu Wang 已提交
981 982
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
983 984
  SScanPhysiNode* pScan =
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
X
Xiaoyu Wang 已提交
985 986 987
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
988

X
bugfix  
Xiaoyu Wang 已提交
989 990 991 992 993 994
  int32_t code = TSDB_CODE_SUCCESS;

  pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
  if (NULL == pScan->pScanCols) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
L
fix  
Liu Jicong 已提交
995 996 997 998 999

  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }

1000 1001 1002
  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }
X
Xiaoyu Wang 已提交
1003
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
1004
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1005 1006 1007 1008 1009
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScan;
  } else {
1010
    nodesDestroyNode((SNode*)pScan);
X
Xiaoyu Wang 已提交
1011 1012 1013 1014 1015
  }

  return code;
}

X
Xiaoyu Wang 已提交
1016 1017
static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                       SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1018 1019
  if (pCxt->pPlanCxt->streamQuery) {
    return createStreamScanPhysiNodeByExchange(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1020
  } else {
X
Xiaoyu Wang 已提交
1021
    return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1022
  }
X
Xiaoyu Wang 已提交
1023 1024
}

X
Xiaoyu Wang 已提交
1025 1026
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWinodwPhysiNode* pWindow,
                                             SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1027 1028
  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pFuncs = NULL;
X
Xiaoyu Wang 已提交
1029
  int32_t    code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
X
Xiaoyu Wang 已提交
1030 1031 1032 1033 1034 1035

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
1036
      code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
1037 1038 1039
    }
  }

1040 1041 1042
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
  }
5
54liuyao 已提交
1043 1044 1045
  if (TSDB_CODE_SUCCESS == code && pWindowLogicNode->pTsEnd) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTsEnd, &pWindow->pTsEnd);
  }
1046

X
Xiaoyu Wang 已提交
1047 1048 1049
  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
1050
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1051 1052 1053
    }
  }

X
Xiaoyu Wang 已提交
1054 1055
  pWindow->triggerType = pWindowLogicNode->triggerType;
  pWindow->watermark = pWindowLogicNode->watermark;
1056
  pWindow->igExpired = pWindowLogicNode->igExpired;
X
Xiaoyu Wang 已提交
1057

X
Xiaoyu Wang 已提交
1058 1059 1060
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pWindow;
  } else {
1061
    nodesDestroyNode((SNode*)pWindow);
X
Xiaoyu Wang 已提交
1062 1063
  }

X
Xiaoyu Wang 已提交
1064 1065 1066
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pFuncs);

X
Xiaoyu Wang 已提交
1067 1068 1069
  return code;
}

1070 1071
static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
  switch (windowAlgo) {
X
Xiaoyu Wang 已提交
1072 1073
    case INTERVAL_ALGO_HASH:
      return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
X
Xiaoyu Wang 已提交
1074
    case INTERVAL_ALGO_MERGE:
1075
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
X
Xiaoyu Wang 已提交
1076 1077 1078 1079 1080 1081
    case INTERVAL_ALGO_STREAM_FINAL:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
    case INTERVAL_ALGO_STREAM_SEMI:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
    case INTERVAL_ALGO_STREAM_SINGLE:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
1082 1083 1084 1085 1086 1087 1088 1089
    case SESSION_ALGO_STREAM_FINAL:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
    case SESSION_ALGO_STREAM_SEMI:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION;
    case SESSION_ALGO_STREAM_SINGLE:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
    case SESSION_ALGO_MERGE:
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
X
Xiaoyu Wang 已提交
1090 1091
    default:
      break;
1092
  }
X
Xiaoyu Wang 已提交
1093
  return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
1094 1095
}

X
Xiaoyu Wang 已提交
1096 1097 1098
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                       SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
1099
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
X
Xiaoyu Wang 已提交
1100 1101 1102
  if (NULL == pInterval) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1103 1104 1105 1106

  pInterval->interval = pWindowLogicNode->interval;
  pInterval->offset = pWindowLogicNode->offset;
  pInterval->sliding = pWindowLogicNode->sliding;
H
Haojun Liao 已提交
1107 1108 1109
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;

X
Xiaoyu Wang 已提交
1110 1111 1112
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
1113 1114 1115
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
1116
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
X
Xiaoyu Wang 已提交
1117 1118
  if (NULL == pSession) {
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1119 1120
  }

X
Xiaoyu Wang 已提交
1121
  pSession->gap = pWindowLogicNode->sessionGap;
X
Xiaoyu Wang 已提交
1122

X
Xiaoyu Wang 已提交
1123
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1124 1125
}

X
Xiaoyu Wang 已提交
1126 1127
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
1128 1129 1130
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
      pCxt, (SLogicNode*)pWindowLogicNode,
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE : QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE));
1131 1132 1133 1134 1135
  if (NULL == pState) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
X
Xiaoyu Wang 已提交
1136 1137
  SNode*     pStateKey = NULL;
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS != code) {
1156
    nodesDestroyNode((SNode*)pState);
1157 1158 1159 1160 1161 1162
    return code;
  }

  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
1163 1164
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
                                     SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1165 1166
  switch (pWindowLogicNode->winType) {
    case WINDOW_TYPE_INTERVAL:
X
Xiaoyu Wang 已提交
1167
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1168
    case WINDOW_TYPE_SESSION:
X
Xiaoyu Wang 已提交
1169
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1170
    case WINDOW_TYPE_STATE:
1171
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1172 1173 1174
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1175
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
1176 1177
}

X
Xiaoyu Wang 已提交
1178 1179
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
                                   SPhysiNode** pPhyNode) {
1180 1181 1182
  SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(
      pCxt, (SLogicNode*)pSortLogicNode,
      pSortLogicNode->groupSort ? QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT : QUERY_NODE_PHYSICAL_PLAN_SORT);
X
Xiaoyu Wang 已提交
1183 1184 1185 1186 1187 1188
  if (NULL == pSort) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pSortKeys = NULL;
X
Xiaoyu Wang 已提交
1189
  int32_t    code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
X
Xiaoyu Wang 已提交
1190 1191 1192 1193 1194 1195

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
1196
      code = pushdownDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
1197 1198 1199 1200 1201
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
X
Xiaoyu Wang 已提交
1202 1203 1204 1205
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortLogicNode->node.pTargets, &pSort->pTargets);
X
Xiaoyu Wang 已提交
1206
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1207
      code = addDataBlockSlots(pCxt, pSort->pTargets, pSort->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1208 1209 1210 1211 1212 1213
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pSort;
  } else {
1214
    nodesDestroyNode((SNode*)pSort);
X
Xiaoyu Wang 已提交
1215 1216
  }

X
Xiaoyu Wang 已提交
1217 1218 1219
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pSortKeys);

X
Xiaoyu Wang 已提交
1220 1221 1222
  return code;
}

X
Xiaoyu Wang 已提交
1223 1224
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                        SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1225 1226
  SPartitionPhysiNode* pPart =
      (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
1227 1228 1229 1230 1231 1232
  if (NULL == pPart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pPartitionKeys = NULL;
X
Xiaoyu Wang 已提交
1233
  int32_t    code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
1234 1235 1236 1237 1238 1239

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1240
      code = pushdownDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
1241 1242 1243 1244 1245
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
X
Xiaoyu Wang 已提交
1246 1247 1248 1249
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->node.pTargets, &pPart->pTargets);
1250
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1251
      code = addDataBlockSlots(pCxt, pPart->pTargets, pPart->node.pOutputDataBlockDesc);
1252 1253 1254 1255 1256 1257
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pPart;
  } else {
1258
    nodesDestroyNode((SNode*)pPart);
1259 1260
  }

X
Xiaoyu Wang 已提交
1261 1262 1263
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pPartitionKeys);

1264 1265 1266
  return code;
}

X
Xiaoyu Wang 已提交
1267 1268
static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SFillLogicNode* pFillNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1269
  SFillPhysiNode* pFill = (SFillPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFillNode, QUERY_NODE_PHYSICAL_PLAN_FILL);
X
Xiaoyu Wang 已提交
1270 1271 1272 1273
  if (NULL == pFill) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1274 1275 1276
  pFill->mode = pFillNode->mode;
  pFill->timeRange = pFillNode->timeRange;

X
Xiaoyu Wang 已提交
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->node.pTargets, &pFill->pTargets);
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pFill->pTargets, pFill->node.pOutputDataBlockDesc);
  }

  if (TSDB_CODE_SUCCESS == code) {
    pFill->pWStartTs = nodesCloneNode(pFillNode->pWStartTs);
    if (NULL == pFill->pWStartTs) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pFillNode->pValues) {
    pFill->pValues = nodesCloneNode(pFillNode->pValues);
    if (NULL == pFill->pValues) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pFill;
  } else {
1300
    nodesDestroyNode((SNode*)pFill);
X
Xiaoyu Wang 已提交
1301 1302 1303 1304 1305
  }

  return code;
}

X
Xiaoyu Wang 已提交
1306
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
1307
  SExchangePhysiNode* pExchange = (SExchangePhysiNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1308 1309 1310 1311
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pMerge->srcGroupId;
D
dapan1121 已提交
1312
  pExchange->singleChannel = true;
X
Xiaoyu Wang 已提交
1313
  pExchange->node.pParent = (SPhysiNode*)pMerge;
1314
  pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1315
  if (NULL == pExchange->node.pOutputDataBlockDesc) {
1316
    nodesDestroyNode((SNode*)pExchange);
X
Xiaoyu Wang 已提交
1317 1318
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1319 1320
  SNode* pSlot = NULL;
  FOREACH(pSlot, pExchange->node.pOutputDataBlockDesc->pSlots) { ((SSlotDescNode*)pSlot)->output = true; }
1321
  return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
1322 1323 1324
}

static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1325 1326
  SMergePhysiNode* pMerge =
      (SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
X
Xiaoyu Wang 已提交
1327 1328 1329 1330 1331 1332
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
  pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
1333
  pMerge->groupSort = pMergeLogicNode->groupSort;
X
Xiaoyu Wang 已提交
1334

X
Xiaoyu Wang 已提交
1335
  int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1336

X
Xiaoyu Wang 已提交
1337 1338 1339 1340 1341 1342
  if (TSDB_CODE_SUCCESS == code) {
    for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
      code = createExchangePhysiNodeByMerge(pMerge);
      if (TSDB_CODE_SUCCESS != code) {
        break;
      }
X
Xiaoyu Wang 已提交
1343 1344 1345
    }
  }

1346
  if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
X
Xiaoyu Wang 已提交
1347 1348 1349 1350
    code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
                         &pMerge->pMergeKeys);
  }

X
Xiaoyu Wang 已提交
1351 1352 1353 1354
  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
                         &pMerge->pTargets);
  }
X
Xiaoyu Wang 已提交
1355 1356 1357
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
  }
X
Xiaoyu Wang 已提交
1358

X
Xiaoyu Wang 已提交
1359 1360 1361
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pMerge;
  } else {
1362
    nodesDestroyNode((SNode*)pMerge);
X
Xiaoyu Wang 已提交
1363 1364 1365 1366 1367
  }

  return code;
}

X
Xiaoyu Wang 已提交
1368 1369
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                                 SNodeList* pChildren, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1370
  switch (nodeType(pLogicNode)) {
X
Xiaoyu Wang 已提交
1371
    case QUERY_NODE_LOGIC_PLAN_SCAN:
X
Xiaoyu Wang 已提交
1372
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1373
    case QUERY_NODE_LOGIC_PLAN_JOIN:
X
Xiaoyu Wang 已提交
1374
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1375
    case QUERY_NODE_LOGIC_PLAN_AGG:
X
Xiaoyu Wang 已提交
1376
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1377
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
X
Xiaoyu Wang 已提交
1378
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1379
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
X
Xiaoyu Wang 已提交
1380
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1381
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
X
Xiaoyu Wang 已提交
1382
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1383 1384
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
1385 1386
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1387 1388
    case QUERY_NODE_LOGIC_PLAN_FILL:
      return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
1389 1390
    case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
      return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1391 1392
    case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
      return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1393 1394
    case QUERY_NODE_LOGIC_PLAN_MERGE:
      return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1395 1396 1397
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1398 1399 1400 1401

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
1402 1403
static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                               SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415
  SNodeList* pChildren = nodesMakeList();
  if (NULL == pChildren) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pLogicChild;
  FOREACH(pLogicChild, pLogicNode->pChildren) {
    SPhysiNode* pChild = NULL;
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
    if (TSDB_CODE_SUCCESS == code) {
1416
      code = nodesListStrictAppend(pChildren, (SNode*)pChild);
X
Xiaoyu Wang 已提交
1417 1418 1419 1420 1421
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
X
Xiaoyu Wang 已提交
1422
  }
X
Xiaoyu Wang 已提交
1423

X
Xiaoyu Wang 已提交
1424
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1425 1426 1427 1428 1429 1430 1431
    if (LIST_LENGTH(pChildren) > 0) {
      (*pPhyNode)->pChildren = pChildren;
      SNode* pChild;
      FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
    } else {
      nodesDestroyList(pChildren);
    }
X
Xiaoyu Wang 已提交
1432 1433
  } else {
    nodesDestroyList(pChildren);
X
Xiaoyu Wang 已提交
1434 1435
  }

X
Xiaoyu Wang 已提交
1436
  return code;
X
Xiaoyu Wang 已提交
1437 1438
}

X
Xiaoyu Wang 已提交
1439
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
1440
  SDataInserterNode* pInserter = (SDataInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
X
Xiaoyu Wang 已提交
1441 1442 1443 1444
  if (NULL == pInserter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1445 1446
  pInserter->numOfTables = pBlocks->numOfTables;
  pInserter->size = pBlocks->size;
wafwerar's avatar
wafwerar 已提交
1447
  TSWAP(pInserter->pData, pBlocks->pData);
X
Xiaoyu Wang 已提交
1448 1449 1450

  *pSink = (SDataSinkNode*)pInserter;
  return TSDB_CODE_SUCCESS;
1451 1452
}

X
Xiaoyu Wang 已提交
1453
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
1454
  SDataDispatcherNode* pDispatcher = (SDataDispatcherNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH);
X
Xiaoyu Wang 已提交
1455 1456 1457 1458
  if (NULL == pDispatcher) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1459
  pDispatcher->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1460
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
1461
    nodesDestroyNode((SNode*)pDispatcher);
X
Xiaoyu Wang 已提交
1462 1463 1464 1465 1466
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pSink = (SDataSinkNode*)pDispatcher;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1467 1468
}

X
Xiaoyu Wang 已提交
1469
static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
1470
  SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
X
Xiaoyu Wang 已提交
1471 1472 1473
  if (NULL == pSubplan) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1474
  pSubplan->id = pLogicSubplan->id;
X
Xiaoyu Wang 已提交
1475 1476
  pSubplan->subplanType = pLogicSubplan->subplanType;
  pSubplan->level = pLogicSubplan->level;
X
Xiaoyu Wang 已提交
1477 1478 1479
  if (NULL != pCxt->pPlanCxt->pUser) {
    strcpy(pSubplan->user, pCxt->pPlanCxt->pUser);
  }
X
Xiaoyu Wang 已提交
1480 1481 1482
  return pSubplan;
}

X
Xiaoyu Wang 已提交
1483 1484 1485 1486 1487 1488 1489 1490
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
  pSubplan->msgType = pModify->msgType;
  pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
  return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
}

static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
                                 SDataSinkNode** pSink) {
1491
  SDataDeleterNode* pDeleter = (SDataDeleterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE);
X
Xiaoyu Wang 已提交
1492 1493 1494 1495 1496 1497 1498 1499 1500
  if (NULL == pDeleter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pDeleter->tableId = pModify->tableId;
  pDeleter->tableType = pModify->tableType;
  strcpy(pDeleter->tableFName, pModify->tableFName);
  pDeleter->deleteTimeRange = pModify->deleteTimeRange;

X
Xiaoyu Wang 已提交
1501 1502 1503
  int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
                               &pDeleter->pAffectedRows);
  if (TSDB_CODE_SUCCESS == code) {
1504
    pDeleter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1505 1506 1507 1508 1509 1510 1511 1512
    if (NULL == pDeleter->sink.pInputDataBlockDesc) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pSink = (SDataSinkNode*)pDeleter;
  } else {
1513
    nodesDestroyNode((SNode*)pDeleter);
X
Xiaoyu Wang 已提交
1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
  int32_t code =
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
  if (TSDB_CODE_SUCCESS == code) {
    code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
  }
D
dapan1121 已提交
1525
  pSubplan->msgType = TDMT_VND_DELETE;
X
Xiaoyu Wang 已提交
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
  return code;
}

static int32_t buildVnodeModifySubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pSubplan) {
  int32_t                code = TSDB_CODE_SUCCESS;
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
  switch (pModify->modifyType) {
    case MODIFY_TABLE_TYPE_INSERT:
      code = buildInsertSubplan(pCxt, pModify, pSubplan);
      break;
    case MODIFY_TABLE_TYPE_DELETE:
      code = buildDeleteSubplan(pCxt, pModify, pSubplan);
      break;
    default:
      code = TSDB_CODE_FAILED;
      break;
  }
  return code;
}

X
Xiaoyu Wang 已提交
1546
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
X
Xiaoyu Wang 已提交
1547
  SSubplan* pSubplan = makeSubplan(pCxt, pLogicSubplan);
X
Xiaoyu Wang 已提交
1548 1549 1550 1551 1552 1553
  if (NULL == pSubplan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

1554
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
X
Xiaoyu Wang 已提交
1555
    code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
1556
  } else {
D
dapan1121 已提交
1557 1558 1559 1560 1561
    if (SUBPLAN_TYPE_SCAN == pSubplan->subplanType) {
      pSubplan->msgType = TDMT_SCH_QUERY;
    } else {
      pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
    }
X
Xiaoyu Wang 已提交
1562 1563 1564 1565
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
    }
1566
  }
X
Xiaoyu Wang 已提交
1567

X
Xiaoyu Wang 已提交
1568 1569 1570
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiSubplan = pSubplan;
  } else {
1571
    nodesDestroyNode((SNode*)pSubplan);
X
Xiaoyu Wang 已提交
1572
  }
X
Xiaoyu Wang 已提交
1573

X
Xiaoyu Wang 已提交
1574
  return code;
X
Xiaoyu Wang 已提交
1575 1576
}

X
Xiaoyu Wang 已提交
1577
static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
1578
  SQueryPlan* pPlan = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
X
Xiaoyu Wang 已提交
1579 1580 1581 1582 1583
  if (NULL == pPlan) {
    return NULL;
  }
  pPlan->pSubplans = nodesMakeList();
  if (NULL == pPlan->pSubplans) {
1584
    nodesDestroyNode((SNode*)pPlan);
X
Xiaoyu Wang 已提交
1585
    return NULL;
1586
  }
X
Xiaoyu Wang 已提交
1587 1588
  pPlan->queryId = pCxt->pPlanCxt->queryId;
  return pPlan;
1589 1590
}

1591 1592
static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNode* pSubplan, int32_t level, SNodeList* pSubplans) {
  SNodeListNode* pGroup = NULL;
1593
  if (level >= LIST_LENGTH(pSubplans)) {
1594
    pGroup = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
X
bugfix  
Xiaoyu Wang 已提交
1595 1596 1597
    if (NULL == pGroup) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1598
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, (SNode*)pGroup)) {
X
bugfix  
Xiaoyu Wang 已提交
1599 1600
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1601
  } else {
1602
    pGroup = (SNodeListNode*)nodesListGetNode(pSubplans, level);
1603 1604 1605
  }
  if (NULL == pGroup->pNodeList) {
    pGroup->pNodeList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
1606 1607 1608
    if (NULL == pGroup->pNodeList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1609
  }
1610
  return nodesListStrictAppend(pGroup->pNodeList, (SNode*)pSubplan);
1611 1612
}

X
Xiaoyu Wang 已提交
1613 1614
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent,
                              SQueryPlan* pQueryPlan) {
X
Xiaoyu Wang 已提交
1615
  SSubplan* pSubplan = NULL;
X
Xiaoyu Wang 已提交
1616
  int32_t   code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
X
Xiaoyu Wang 已提交
1617

X
Xiaoyu Wang 已提交
1618
  if (TSDB_CODE_SUCCESS == code) {
1619
    code = pushSubplan(pCxt, (SNode*)pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
X
Xiaoyu Wang 已提交
1620
    ++(pQueryPlan->numOfSubplans);
1621 1622
  }

1623
  if (TSDB_CODE_SUCCESS != code) {
1624
    nodesDestroyNode((SNode*)pSubplan);
1625 1626 1627
    return code;
  }

X
Xiaoyu Wang 已提交
1628
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
1629
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pSubplan);
X
Xiaoyu Wang 已提交
1630
    if (TSDB_CODE_SUCCESS == code) {
1631
      code = nodesListMakeAppend(&pSubplan->pParents, (SNode*)pParent);
X
Xiaoyu Wang 已提交
1632 1633
    }
  }
X
Xiaoyu Wang 已提交
1634

X
Xiaoyu Wang 已提交
1635 1636 1637 1638 1639 1640
  if (TSDB_CODE_SUCCESS == code) {
    SNode* pChild = NULL;
    FOREACH(pChild, pLogicSubplan->pChildren) {
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
      if (TSDB_CODE_SUCCESS != code) {
        break;
X
Xiaoyu Wang 已提交
1641 1642 1643
      }
    }
  }
X
Xiaoyu Wang 已提交
1644

X
Xiaoyu Wang 已提交
1645
  return code;
1646 1647
}

X
Xiaoyu Wang 已提交
1648
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
1649
  SQueryPlan* pPlan = (SQueryPlan*)makeQueryPhysiPlan(pCxt);
X
Xiaoyu Wang 已提交
1650 1651
  if (NULL == pPlan) {
    return TSDB_CODE_OUT_OF_MEMORY;
1652
  }
X
Xiaoyu Wang 已提交
1653

X
Xiaoyu Wang 已提交
1654
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1655

X
Xiaoyu Wang 已提交
1656 1657 1658 1659 1660 1661
  SNode* pSubplan = NULL;
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
1662 1663
  }

X
Xiaoyu Wang 已提交
1664 1665 1666
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiPlan = pPlan;
  } else {
1667
    nodesDestroyNode((SNode*)pPlan);
X
Xiaoyu Wang 已提交
1668 1669
  }

X
Xiaoyu Wang 已提交
1670
  return code;
X
Xiaoyu Wang 已提交
1671
}
X
Xiaoyu Wang 已提交
1672

X
Xiaoyu Wang 已提交
1673
static void destoryLocationHash(void* p) {
X
Xiaoyu Wang 已提交
1674
  SHashObj*   pHash = *(SHashObj**)p;
X
bugfix  
Xiaoyu Wang 已提交
1675 1676 1677 1678 1679
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
  while (NULL != pIndex) {
    taosArrayDestroy(pIndex->pSlotIdsInfo);
    pIndex = taosHashIterate(pHash, pIndex);
  }
X
Xiaoyu Wang 已提交
1680 1681 1682 1683 1684 1685 1686
  taosHashCleanup(pHash);
}

static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
}

1687 1688 1689 1690 1691
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
1692
    pPlan->explainInfo.ratio = pStmt->pOptions->ratio;
1693 1694 1695 1696 1697
  } else {
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
  }
}

X
Xiaoyu Wang 已提交
1698
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
X
Xiaoyu Wang 已提交
1699 1700 1701 1702 1703
  SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
                           .errCode = TSDB_CODE_SUCCESS,
                           .nextDataBlockId = 0,
                           .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
                           .pExecNodeList = pExecNodeList};
X
Xiaoyu Wang 已提交
1704 1705 1706
  if (NULL == cxt.pLocationHelper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1707

1708 1709
  if (QUERY_POLICY_VNODE == tsQueryPolicy) {
    taosArrayClear(pExecNodeList);
X
Xiaoyu Wang 已提交
1710
  }
1711 1712

  int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
1713 1714 1715 1716
  if (TSDB_CODE_SUCCESS == code) {
    setExplainInfo(pCxt, *pPlan);
  }

X
Xiaoyu Wang 已提交
1717 1718
  destoryPhysiPlanContext(&cxt);
  return code;
X
Xiaoyu Wang 已提交
1719
}