planPhysiCreater.c 58.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "planInt.h"
X
Xiaoyu Wang 已提交
17

X
Xiaoyu Wang 已提交
18
#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "functionMgt.h"
20
#include "systable.h"
X
Xiaoyu Wang 已提交
21
#include "tglobal.h"
X
Xiaoyu Wang 已提交
22

X
bugfix  
Xiaoyu Wang 已提交
23 24
typedef struct SSlotIdInfo {
  int16_t slotId;
X
Xiaoyu Wang 已提交
25
  bool    set;
X
bugfix  
Xiaoyu Wang 已提交
26 27
} SSlotIdInfo;

X
Xiaoyu Wang 已提交
28 29
typedef struct SSlotIndex {
  int16_t dataBlockId;
X
Xiaoyu Wang 已提交
30
  SArray* pSlotIdsInfo;  // duplicate name slot
X
Xiaoyu Wang 已提交
31 32 33
} SSlotIndex;

typedef struct SPhysiPlanContext {
X
Xiaoyu Wang 已提交
34
  SPlanContext* pPlanCxt;
X
Xiaoyu Wang 已提交
35 36 37
  int32_t       errCode;
  int16_t       nextDataBlockId;
  SArray*       pLocationHelper;
38
  SArray*       pExecNodeList;  // SArray<SQueryNodeLoad>
X
Xiaoyu Wang 已提交
39 40
} SPhysiPlanContext;

X
Xiaoyu Wang 已提交
41
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
X
Xiaoyu Wang 已提交
42 43
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
    SColumnNode* pCol = (SColumnNode*)pNode;
X
Xiaoyu Wang 已提交
44
    if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
45 46
      return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
    }
X
Xiaoyu Wang 已提交
47 48 49 50 51
    if ('\0' == pCol->tableAlias[0]) {
      return sprintf(pKey, "%s", pCol->colName);
    }
    return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
  }
X
Xiaoyu Wang 已提交
52

X
Xiaoyu Wang 已提交
53
  if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
54 55
    return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName);
  }
X
Xiaoyu Wang 已提交
56 57 58
  return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}

X
Xiaoyu Wang 已提交
59
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId, bool output, bool reserve) {
X
Xiaoyu Wang 已提交
60
  SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
X
bugfix  
Xiaoyu Wang 已提交
61 62 63
  if (NULL == pSlot) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
64 65
  pSlot->slotId = slotId;
  pSlot->dataType = ((SExprNode*)pNode)->resType;
X
Xiaoyu Wang 已提交
66
  pSlot->reserve = reserve;
X
bugfix  
Xiaoyu Wang 已提交
67
  pSlot->output = output;
X
Xiaoyu Wang 已提交
68 69 70
  return (SNode*)pSlot;
}

X
bugfix  
Xiaoyu Wang 已提交
71
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
X
Xiaoyu Wang 已提交
72 73
  STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
  if (NULL == pTarget) {
X
bugfix  
Xiaoyu Wang 已提交
74
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
75
  }
X
bugfix  
Xiaoyu Wang 已提交
76

X
Xiaoyu Wang 已提交
77 78 79
  pTarget->dataBlockId = dataBlockId;
  pTarget->slotId = slotId;
  pTarget->pExpr = pNode;
X
bugfix  
Xiaoyu Wang 已提交
80 81 82

  *pOutput = (SNode*)pTarget;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
83 84
}

X
bugfix  
Xiaoyu Wang 已提交
85
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
86 87
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
  if (NULL != pIndex) {
X
Xiaoyu Wang 已提交
88
    SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
89 90 91 92
    taosArrayPush(pIndex->pSlotIdsInfo, &info);
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
93
  SSlotIndex index = {.dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo))};
X
bugfix  
Xiaoyu Wang 已提交
94 95 96
  if (NULL == index.pSlotIdsInfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
97
  SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
98
  taosArrayPush(index.pSlotIdsInfo, &info);
X
bugfix  
Xiaoyu Wang 已提交
99 100 101 102
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}

static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
X
Xiaoyu Wang 已提交
103
  char    name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
X
Xiaoyu Wang 已提交
104
  int32_t len = getSlotKey(pNode, NULL, name);
X
bugfix  
Xiaoyu Wang 已提交
105 106 107
  return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash);
}

X
Xiaoyu Wang 已提交
108 109
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
                                       SHashObj** pDescHash) {
X
bugfix  
Xiaoyu Wang 已提交
110
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
bugfix  
Xiaoyu Wang 已提交
111 112 113 114 115 116 117 118 119 120 121 122
  if (NULL == pHash) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
    taosHashCleanup(pHash);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pDescHash = pHash;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
123 124
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
                                   SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
125
  pDataBlockDesc->pSlots = nodesMakeList();
X
Xiaoyu Wang 已提交
126
  if (NULL == pDataBlockDesc->pSlots) {
X
bugfix  
Xiaoyu Wang 已提交
127 128
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
129

X
bugfix  
Xiaoyu Wang 已提交
130 131
  int32_t code = TSDB_CODE_SUCCESS;
  int16_t slotId = 0;
X
Xiaoyu Wang 已提交
132
  SNode*  pNode = NULL;
X
bugfix  
Xiaoyu Wang 已提交
133
  FOREACH(pNode, pList) {
X
Xiaoyu Wang 已提交
134
    code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId, true, false));
X
bugfix  
Xiaoyu Wang 已提交
135 136 137 138
    if (TSDB_CODE_SUCCESS == code) {
      code = putSlotToHash(pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
    }
    if (TSDB_CODE_SUCCESS == code) {
139 140
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
X
bugfix  
Xiaoyu Wang 已提交
141 142 143
      ++slotId;
    } else {
      break;
X
Xiaoyu Wang 已提交
144
    }
X
bugfix  
Xiaoyu Wang 已提交
145 146 147 148 149
  }
  return code;
}

static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
150
  SDataBlockDescNode* pDesc = (SDataBlockDescNode*)nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
X
bugfix  
Xiaoyu Wang 已提交
151 152 153 154 155 156
  if (NULL == pDesc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pDesc->dataBlockId = pCxt->nextDataBlockId++;

  SHashObj* pHash = NULL;
X
Xiaoyu Wang 已提交
157
  int32_t   code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash);
X
bugfix  
Xiaoyu Wang 已提交
158 159 160 161 162 163
  if (TSDB_CODE_SUCCESS == code) {
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pDataBlockDesc = pDesc;
X
Xiaoyu Wang 已提交
164
  } else {
165
    nodesDestroyNode((SNode*)pDesc);
X
Xiaoyu Wang 已提交
166
  }
X
bugfix  
Xiaoyu Wang 已提交
167 168 169 170

  return code;
}

X
bugfix  
Xiaoyu Wang 已提交
171 172 173 174 175 176 177 178 179 180 181 182
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
  for (int32_t i = 0; i < size; ++i) {
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
    if (!pInfo->set) {
      pInfo->set = true;
      return pInfo->slotId;
    }
  }
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
}

X
Xiaoyu Wang 已提交
183
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
X
Xiaoyu Wang 已提交
184
                                     const char* pStmtName, bool output, bool reserve) {
185 186 187 188
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
189
  int32_t   code = TSDB_CODE_SUCCESS;
X
bugfix  
Xiaoyu Wang 已提交
190
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
X
Xiaoyu Wang 已提交
191 192
  int16_t   nextSlotId = taosHashGetSize(pHash), slotId = 0;
  SNode*    pNode = NULL;
X
Xiaoyu Wang 已提交
193
  FOREACH(pNode, pList) {
X
Xiaoyu Wang 已提交
194 195 196
    SNode*      pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
    char        name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    int32_t     len = getSlotKey(pExpr, pStmtName, name);
X
bugfix  
Xiaoyu Wang 已提交
197 198
    SSlotIndex* pIndex = taosHashGet(pHash, name, len);
    if (NULL == pIndex) {
X
Xiaoyu Wang 已提交
199
      code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pExpr, nextSlotId, output, reserve));
X
bugfix  
Xiaoyu Wang 已提交
200 201 202
      if (TSDB_CODE_SUCCESS == code) {
        code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
      }
203 204 205 206
      pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
      if (output) {
        pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
      }
X
bugfix  
Xiaoyu Wang 已提交
207 208 209
      slotId = nextSlotId;
      ++nextSlotId;
    } else {
X
bugfix  
Xiaoyu Wang 已提交
210
      slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
X
bugfix  
Xiaoyu Wang 已提交
211
    }
212

X
bugfix  
Xiaoyu Wang 已提交
213 214 215 216 217 218 219
    if (TSDB_CODE_SUCCESS == code) {
      SNode* pTarget = NULL;
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pTarget);
      }
    }
X
Xiaoyu Wang 已提交
220

X
bugfix  
Xiaoyu Wang 已提交
221 222 223
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
224
  }
X
bugfix  
Xiaoyu Wang 已提交
225 226 227 228
  return code;
}

static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
229
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, false);
X
Xiaoyu Wang 已提交
230 231
}

232 233 234 235 236 237
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
  if (NULL == pNode || NULL == *pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
238
  int32_t    code = nodesListMakeAppend(&pList, *pNode);
239 240 241 242 243 244 245 246 247 248
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pNode = nodesListGetNode(pList, 0);
  }
  nodesClearList(pList);
  return code;
}

X
Xiaoyu Wang 已提交
249 250
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
                                           SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
251
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true, false);
X
bugfix  
Xiaoyu Wang 已提交
252 253 254
}

static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
255
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true, true);
X
Xiaoyu Wang 已提交
256 257 258
}

typedef struct SSetSlotIdCxt {
X
Xiaoyu Wang 已提交
259
  int32_t   errCode;
X
Xiaoyu Wang 已提交
260 261 262 263
  SHashObj* pLeftHash;
  SHashObj* pRightHash;
} SSetSlotIdCxt;

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
static void dumpSlots(const char* pName, SHashObj* pHash) {
  if (NULL == pHash) {
    return;
  }
  planDebug("%s", pName);
  void* pIt = taosHashIterate(pHash, NULL);
  while (NULL != pIt) {
    size_t len = 0;
    char*  pKey = taosHashGetKey(pIt, &len);
    char   name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    strncpy(name, pKey, len);
    planDebug("\tslot name = %s", name);
    pIt = taosHashIterate(pHash, pIt);
  }
}

X
Xiaoyu Wang 已提交
280 281 282
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
X
Xiaoyu Wang 已提交
283 284 285
    char           name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    int32_t        len = getSlotKey(pNode, NULL, name);
    SSlotIndex*    pIndex = taosHashGet(pCxt->pLeftHash, name, len);
X
Xiaoyu Wang 已提交
286 287 288 289
    if (NULL == pIndex) {
      pIndex = taosHashGet(pCxt->pRightHash, name, len);
    }
    // pIndex is definitely not NULL, otherwise it is a bug
X
bugfix  
Xiaoyu Wang 已提交
290
    if (NULL == pIndex) {
291
      planError("doSetSlotId failed, invalid slot name %s", name);
292 293
      dumpSlots("left datablock desc", pCxt->pLeftHash);
      dumpSlots("right datablock desc", pCxt->pRightHash);
294
      pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
X
bugfix  
Xiaoyu Wang 已提交
295 296
      return DEAL_RES_ERROR;
    }
X
Xiaoyu Wang 已提交
297
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
X
bugfix  
Xiaoyu Wang 已提交
298
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
X
Xiaoyu Wang 已提交
299 300 301 302 303
    return DEAL_RES_IGNORE_CHILD;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
304 305
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
                             SNode** pOutput) {
X
Xiaoyu Wang 已提交
306
  SNode* pRes = nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
307 308 309 310 311
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
312 313 314
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
315
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
316 317
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyNode(pRes);
X
Xiaoyu Wang 已提交
318
    return cxt.errCode;
X
Xiaoyu Wang 已提交
319
  }
X
Xiaoyu Wang 已提交
320 321 322

  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
323 324
}

X
Xiaoyu Wang 已提交
325 326
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
                             const SNodeList* pList, SNodeList** pOutput) {
X
Xiaoyu Wang 已提交
327
  SNodeList* pRes = nodesCloneList(pList);
X
Xiaoyu Wang 已提交
328 329 330 331 332
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
333 334 335
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
336
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
337 338
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyList(pRes);
X
Xiaoyu Wang 已提交
339
    return cxt.errCode;
X
Xiaoyu Wang 已提交
340
  }
X
Xiaoyu Wang 已提交
341 342
  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
343 344
}

X
Xiaoyu Wang 已提交
345
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
X
Xiaoyu Wang 已提交
346
  SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
X
Xiaoyu Wang 已提交
347 348 349
  if (NULL == pPhysiNode) {
    return NULL;
  }
X
bugfix  
Xiaoyu Wang 已提交
350 351 352

  int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
  if (TSDB_CODE_SUCCESS != code) {
353
    nodesDestroyNode((SNode*)pPhysiNode);
X
Xiaoyu Wang 已提交
354 355
    return NULL;
  }
X
Xiaoyu Wang 已提交
356
  pPhysiNode->pOutputDataBlockDesc->precision = pLogicNode->precision;
X
Xiaoyu Wang 已提交
357 358 359 360 361
  return pPhysiNode;
}

static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
  if (NULL != pLogicNode->pConditions) {
X
Xiaoyu Wang 已提交
362 363
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions,
                         &pPhysiNode->pConditions);
X
Xiaoyu Wang 已提交
364 365 366 367
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
}

static int32_t sortScanCols(SNodeList* pScanCols) {
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
  if (NULL == pArray) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNode* pCol = NULL;
X
Xiaoyu Wang 已提交
381
  FOREACH(pCol, pScanCols) { taosArrayPush(pArray, &pCol); }
X
Xiaoyu Wang 已提交
382 383 384
  taosArraySort(pArray, colIdCompare);

  int32_t index = 0;
X
Xiaoyu Wang 已提交
385
  FOREACH(pCol, pScanCols) { REPLACE_NODE(taosArrayGetP(pArray, index++)); }
X
Xiaoyu Wang 已提交
386 387 388 389 390
  taosArrayDestroy(pArray);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
391
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
392 393 394 395
  if (NULL == pScanCols) {
    return TSDB_CODE_SUCCESS;
  }

396 397 398
  pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
  if (NULL == pScanPhysiNode->pScanCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
399
  }
X
bugfix  
Xiaoyu Wang 已提交
400
  return sortScanCols(pScanPhysiNode->pScanCols);
401 402
}

X
Xiaoyu Wang 已提交
403
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
X
Xiaoyu Wang 已提交
404
                                           SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
405 406 407
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
  if (TSDB_CODE_SUCCESS == code) {
    // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
X
bugfix  
Xiaoyu Wang 已提交
408
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
409
  }
410 411 412 413 414 415 416 417 418 419 420 421

  if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
    pScanPhysiNode->pScanPseudoCols = nodesCloneList(pScanLogicNode->pScanPseudoCols);
    if (NULL == pScanPhysiNode->pScanPseudoCols) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
  }

X
Xiaoyu Wang 已提交
422 423 424
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
  }
425

X
Xiaoyu Wang 已提交
426
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
427
    pScanPhysiNode->uid = pScanLogicNode->tableId;
X
Xiaoyu Wang 已提交
428
    pScanPhysiNode->suid = pScanLogicNode->stableId;
X
Xiaoyu Wang 已提交
429
    pScanPhysiNode->tableType = pScanLogicNode->tableType;
X
Xiaoyu Wang 已提交
430
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
X
Xiaoyu Wang 已提交
431 432 433 434 435 436
    if (NULL != pScanLogicNode->pTagCond) {
      pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond);
      if (NULL == pSubplan->pTagCond) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
X
Xiaoyu Wang 已提交
437
  }
X
Xiaoyu Wang 已提交
438

X
Xiaoyu Wang 已提交
439 440 441
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
  } else {
442
    nodesDestroyNode((SNode*)pScanPhysiNode);
X
Xiaoyu Wang 已提交
443
  }
X
Xiaoyu Wang 已提交
444

X
Xiaoyu Wang 已提交
445
  return code;
X
Xiaoyu Wang 已提交
446 447
}

448 449
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
450
  pNodeAddr->epSet = vg->epSet;
451 452
}

453 454
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                      SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
455 456
  STagScanPhysiNode* pTagScan =
      (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
X
Xiaoyu Wang 已提交
457 458 459
  if (NULL == pTagScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
460
  vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
D
dapan1121 已提交
461
  SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
D
dapan1121 已提交
462
  taosArrayPush(pCxt->pExecNodeList, &node);
X
Xiaoyu Wang 已提交
463
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
X
Xiaoyu Wang 已提交
464 465
}

X
Xiaoyu Wang 已提交
466 467 468 469 470 471 472
static ENodeType getScanOperatorType(EScanType scanType) {
  switch (scanType) {
    case SCAN_TYPE_TABLE:
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
    case SCAN_TYPE_STREAM:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
    case SCAN_TYPE_TABLE_MERGE:
S
slzhou 已提交
473 474
      // return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
X
Xiaoyu Wang 已提交
475 476 477 478 479 480
    default:
      break;
  }
  return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
}

X
Xiaoyu Wang 已提交
481 482
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                        SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
483 484
  STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
                                                                        getScanOperatorType(pScanLogicNode->scanType));
X
Xiaoyu Wang 已提交
485 486 487 488
  if (NULL == pTableScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
489
  memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
X
Xiaoyu Wang 已提交
490
  pTableScan->scanRange = pScanLogicNode->scanRange;
491
  pTableScan->ratio = pScanLogicNode->ratio;
5
54liuyao 已提交
492 493 494 495 496
  if (pScanLogicNode->pVgroupList) {
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
  }
  if (pCxt->pExecNodeList) {
D
dapan1121 已提交
497 498
    SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
    taosArrayPush(pCxt->pExecNodeList, &node);
5
54liuyao 已提交
499
  }
D
dapan1121 已提交
500
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
501 502
  pTableScan->dataRequired = pScanLogicNode->dataRequired;
  pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
X
Xiaoyu Wang 已提交
503 504 505
  pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags);
  if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
      (NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) {
506
    nodesDestroyNode((SNode*)pTableScan);
507 508
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
509 510 511 512 513
  pTableScan->interval = pScanLogicNode->interval;
  pTableScan->offset = pScanLogicNode->offset;
  pTableScan->sliding = pScanLogicNode->sliding;
  pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
  pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
5
54liuyao 已提交
514 515 516
  pTableScan->triggerType = pScanLogicNode->triggerType;
  pTableScan->watermark = pScanLogicNode->watermark;
  pTableScan->tsColId = pScanLogicNode->tsColId;
5
54liuyao 已提交
517
  pTableScan->filesFactor = pScanLogicNode->filesFactor;
X
Xiaoyu Wang 已提交
518

X
Xiaoyu Wang 已提交
519
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
X
Xiaoyu Wang 已提交
520 521
}

X
Xiaoyu Wang 已提交
522 523
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                              SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
524 525
  SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
                                                                               QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
X
Xiaoyu Wang 已提交
526 527 528 529
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
530 531
  pScan->showRewrite = pScanLogicNode->showRewrite;
  pScan->accountId = pCxt->pPlanCxt->acctId;
532 533
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
X
Xiaoyu Wang 已提交
534
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
535
    SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
D
dapan1121 已提交
536
    taosArrayPush(pCxt->pExecNodeList, &node);
X
Xiaoyu Wang 已提交
537
  } else {
538
    SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
D
dapan1121 已提交
539
    taosArrayPush(pCxt->pExecNodeList, &node);
X
Xiaoyu Wang 已提交
540
  }
X
Xiaoyu Wang 已提交
541
  pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
D
dapan1121 已提交
542
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
X
Xiaoyu Wang 已提交
543

X
Xiaoyu Wang 已提交
544
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
X
Xiaoyu Wang 已提交
545 546
}

X
Xiaoyu Wang 已提交
547 548
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                         SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
549 550 551 552 553 554
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
}

static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
555 556
}

X
Xiaoyu Wang 已提交
557 558
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
559 560
  switch (pScanLogicNode->scanType) {
    case SCAN_TYPE_TAG:
561
      return createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
562
    case SCAN_TYPE_TABLE:
X
Xiaoyu Wang 已提交
563
      return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
564
    case SCAN_TYPE_SYSTEM_TABLE:
X
Xiaoyu Wang 已提交
565
      return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
566
    case SCAN_TYPE_STREAM:
X
Xiaoyu Wang 已提交
567
      return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
568 569
    case SCAN_TYPE_TABLE_MERGE:
      return createTableMergeScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
570 571 572
    default:
      break;
  }
X
Xiaoyu Wang 已提交
573
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
574 575
}

X
Xiaoyu Wang 已提交
576 577
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
578
  SJoinPhysiNode* pJoin =
579
      (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
X
Xiaoyu Wang 已提交
580 581 582
  if (NULL == pJoin) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
583

584 585
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
X
Xiaoyu Wang 已提交
586
  int32_t             code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
587

588 589
  pJoin->joinType = pJoinLogicNode->joinType;
  if (NULL != pJoinLogicNode->pOnConditions) {
X
Xiaoyu Wang 已提交
590 591
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions,
                         &pJoin->pOnConditions);
592
  }
X
Xiaoyu Wang 已提交
593
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
594 595
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
                         &pJoin->pTargets);
X
Xiaoyu Wang 已提交
596 597
  }
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
598
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
599 600 601 602
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
  }
X
Xiaoyu Wang 已提交
603

X
Xiaoyu Wang 已提交
604 605 606
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pJoin;
  } else {
607
    nodesDestroyNode((SNode*)pJoin);
X
Xiaoyu Wang 已提交
608
  }
X
Xiaoyu Wang 已提交
609

X
Xiaoyu Wang 已提交
610
  return code;
X
Xiaoyu Wang 已提交
611 612 613
}

typedef struct SRewritePrecalcExprsCxt {
X
Xiaoyu Wang 已提交
614 615 616
  int32_t    errCode;
  int32_t    planNodeId;
  int32_t    rewriteId;
X
Xiaoyu Wang 已提交
617 618 619 620 621
  SNodeList* pPrecalcExprs;
} SRewritePrecalcExprsCxt;

static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SNode* pExpr = nodesCloneNode(*pNode);
X
bugfix  
Xiaoyu Wang 已提交
622
  if (NULL == pExpr) {
623
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
bugfix  
Xiaoyu Wang 已提交
624 625
    return DEAL_RES_ERROR;
  }
X
Xiaoyu Wang 已提交
626
  if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
627
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
628 629 630 631 632
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
  if (NULL == pCol) {
633
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
634 635 636 637 638 639 640 641
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
  pCol->node.resType = pRewrittenExpr->resType;
  if ('\0' != pRewrittenExpr->aliasName[0]) {
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  } else {
X
Xiaoyu Wang 已提交
642 643
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId,
             pCxt->rewriteId);
X
Xiaoyu Wang 已提交
644 645 646 647 648 649 650
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  }
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)pCol;
  return DEAL_RES_IGNORE_CHILD;
}

651 652 653 654 655
static int32_t rewriteValueToOperator(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SOperatorNode* pOper = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
  if (NULL == pOper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
656 657
  pOper->pLeft = nodesMakeNode(QUERY_NODE_LEFT_VALUE);
  if (NULL == pOper->pLeft) {
658
    nodesDestroyNode((SNode*)pOper);
659 660
    return TSDB_CODE_OUT_OF_MEMORY;
  }
661 662 663 664
  SValueNode* pVal = (SValueNode*)*pNode;
  pOper->node.resType = pVal->node.resType;
  strcpy(pOper->node.aliasName, pVal->node.aliasName);
  pOper->opType = OP_TYPE_ASSIGN;
665
  pOper->pRight = *pNode;
666 667 668 669
  *pNode = (SNode*)pOper;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
670 671 672
static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
  switch (nodeType(*pNode)) {
673
    case QUERY_NODE_VALUE: {
674 675 676
      if (((SValueNode*)*pNode)->notReserved) {
        break;
      }
677 678
      pCxt->errCode = rewriteValueToOperator(pCxt, pNode);
      if (TSDB_CODE_SUCCESS != pCxt->errCode) {
679 680 681 682
        return DEAL_RES_ERROR;
      }
      return collectAndRewrite(pCxt, pNode);
    }
X
Xiaoyu Wang 已提交
683 684
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
685
      return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
686 687
    }
    case QUERY_NODE_FUNCTION: {
688
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
689
        return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
690 691 692 693 694 695 696 697
      }
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
698 699
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs,
                                   SNodeList** pRewrittenList) {
X
Xiaoyu Wang 已提交
700 701 702 703 704 705
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == *pPrecalcExprs) {
    *pPrecalcExprs = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
706 707 708
    if (NULL == *pPrecalcExprs) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
709 710 711
  }
  if (NULL == *pRewrittenList) {
    *pRewrittenList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
712 713 714
    if (NULL == *pRewrittenList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
715 716 717 718 719 720 721 722 723
  }
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
    SNode* pNew = NULL;
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
      pNew = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0));
    } else {
      pNew = nodesCloneNode(pNode);
    }
X
bugfix  
Xiaoyu Wang 已提交
724 725 726 727 728 729
    if (NULL == pNew) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
730
  }
X
Xiaoyu Wang 已提交
731
  SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs};
X
Xiaoyu Wang 已提交
732
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
733 734
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs) || TSDB_CODE_SUCCESS != cxt.errCode) {
    DESTORY_LIST(*pPrecalcExprs);
X
Xiaoyu Wang 已提交
735 736 737 738
  }
  return cxt.errCode;
}

X
Xiaoyu Wang 已提交
739 740
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs,
                                  SNode** pRewritten) {
741 742 743 744 745
  if (NULL == pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
746
  int32_t    code = nodesListMakeAppend(&pList, pNode);
747 748 749 750 751 752 753 754 755 756 757 758
  SNodeList* pRewrittenList = NULL;
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
  }
  nodesClearList(pList);
  nodesClearList(pRewrittenList);
  return code;
}

X
Xiaoyu Wang 已提交
759 760
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
                                  SPhysiNode** pPhyNode) {
761 762
  SAggPhysiNode* pAgg =
      (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
X
Xiaoyu Wang 已提交
763 764 765
  if (NULL == pAgg) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
766 767 768 769

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pGroupKeys = NULL;
  SNodeList* pAggFuncs = NULL;
X
Xiaoyu Wang 已提交
770
  int32_t    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
X
Xiaoyu Wang 已提交
771 772 773
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
  }
X
Xiaoyu Wang 已提交
774

X
Xiaoyu Wang 已提交
775 776
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
X
Xiaoyu Wang 已提交
777 778 779
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
780
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
781
    }
X
Xiaoyu Wang 已提交
782 783
  }

X
Xiaoyu Wang 已提交
784 785 786
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
787
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
788
    }
X
Xiaoyu Wang 已提交
789 790
  }

X
Xiaoyu Wang 已提交
791 792 793
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
794
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
795
    }
X
Xiaoyu Wang 已提交
796 797
  }

X
Xiaoyu Wang 已提交
798 799 800
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
  }
X
Xiaoyu Wang 已提交
801

X
Xiaoyu Wang 已提交
802 803 804
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pAgg;
  } else {
805
    nodesDestroyNode((SNode*)pAgg);
X
Xiaoyu Wang 已提交
806
  }
X
Xiaoyu Wang 已提交
807

X
bugfix  
Xiaoyu Wang 已提交
808 809 810 811
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pGroupKeys);
  nodesDestroyList(pAggFuncs);

X
Xiaoyu Wang 已提交
812
  return code;
X
Xiaoyu Wang 已提交
813 814
}

815 816 817
static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SIndefRowsFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
  SIndefRowsFuncPhysiNode* pIdfRowsFunc = (SIndefRowsFuncPhysiNode*)makePhysiNode(
X
Xiaoyu Wang 已提交
818
      pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC);
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
  if (NULL == pIdfRowsFunc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pVectorFuncs = NULL;
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pVectorFuncs, &pPrecalcExprs, &pVectorFuncs);

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pIdfRowsFunc->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = pushdownDataBlockSlots(pCxt, pIdfRowsFunc->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pVectorFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pVectorFuncs, &pIdfRowsFunc->pVectorFuncs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pIdfRowsFunc->pVectorFuncs, pIdfRowsFunc->node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pIdfRowsFunc;
  } else {
846
    nodesDestroyNode((SNode*)pIdfRowsFunc);
847 848 849 850 851
  }

  return code;
}

X
Xiaoyu Wang 已提交
852 853
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                      SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
854 855
  SProjectPhysiNode* pProject =
      (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
X
Xiaoyu Wang 已提交
856 857 858
  if (NULL == pProject) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
859

860 861 862 863 864
  pProject->limit = pProjectLogicNode->limit;
  pProject->offset = pProjectLogicNode->offset;
  pProject->slimit = pProjectLogicNode->slimit;
  pProject->soffset = pProjectLogicNode->soffset;

X
Xiaoyu Wang 已提交
865 866
  int32_t code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId,
                               -1, pProjectLogicNode->pProjections, &pProject->pProjections);
X
Xiaoyu Wang 已提交
867
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
868 869
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections,
                                       pProject->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
870 871 872 873
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
  }
X
Xiaoyu Wang 已提交
874

X
Xiaoyu Wang 已提交
875 876 877
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pProject;
  } else {
878
    nodesDestroyNode((SNode*)pProject);
X
Xiaoyu Wang 已提交
879
  }
X
Xiaoyu Wang 已提交
880

X
Xiaoyu Wang 已提交
881
  return code;
X
Xiaoyu Wang 已提交
882 883
}

X
Xiaoyu Wang 已提交
884 885
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                         SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
886 887
  SExchangePhysiNode* pExchange =
      (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
888 889 890
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
891

X
Xiaoyu Wang 已提交
892
  pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
X
bugfix  
Xiaoyu Wang 已提交
893
  *pPhyNode = (SPhysiNode*)pExchange;
X
Xiaoyu Wang 已提交
894

X
bugfix  
Xiaoyu Wang 已提交
895
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
896
}
X
Xiaoyu Wang 已提交
897

X
Xiaoyu Wang 已提交
898 899
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
900 901
  SScanPhysiNode* pScan =
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
X
Xiaoyu Wang 已提交
902 903 904
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
905

X
bugfix  
Xiaoyu Wang 已提交
906 907 908 909 910 911
  int32_t code = TSDB_CODE_SUCCESS;

  pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
  if (NULL == pScan->pScanCols) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
L
fix  
Liu Jicong 已提交
912 913 914 915 916

  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }

917 918 919
  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }
X
Xiaoyu Wang 已提交
920
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
921
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
922 923 924 925 926
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScan;
  } else {
927
    nodesDestroyNode((SNode*)pScan);
X
Xiaoyu Wang 已提交
928 929 930 931 932
  }

  return code;
}

X
Xiaoyu Wang 已提交
933 934
static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                       SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
935 936
  if (pCxt->pPlanCxt->streamQuery) {
    return createStreamScanPhysiNodeByExchange(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
937
  } else {
X
Xiaoyu Wang 已提交
938
    return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
939
  }
X
Xiaoyu Wang 已提交
940 941
}

X
Xiaoyu Wang 已提交
942 943
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWinodwPhysiNode* pWindow,
                                             SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
944 945
  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pFuncs = NULL;
X
Xiaoyu Wang 已提交
946
  int32_t    code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
X
Xiaoyu Wang 已提交
947 948 949 950 951 952

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
953
      code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
954 955 956
    }
  }

957 958 959 960
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
  }

X
Xiaoyu Wang 已提交
961 962 963
  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
964
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
965 966 967
    }
  }

X
Xiaoyu Wang 已提交
968 969
  pWindow->triggerType = pWindowLogicNode->triggerType;
  pWindow->watermark = pWindowLogicNode->watermark;
5
54liuyao 已提交
970
  pWindow->filesFactor = pWindowLogicNode->filesFactor;
X
Xiaoyu Wang 已提交
971

X
Xiaoyu Wang 已提交
972 973 974
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pWindow;
  } else {
975
    nodesDestroyNode((SNode*)pWindow);
X
Xiaoyu Wang 已提交
976 977 978 979 980
  }

  return code;
}

981 982
static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
  switch (windowAlgo) {
X
Xiaoyu Wang 已提交
983 984
    case INTERVAL_ALGO_HASH:
      return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
X
Xiaoyu Wang 已提交
985 986
    case INTERVAL_ALGO_MERGE:
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
X
Xiaoyu Wang 已提交
987 988 989 990 991 992
    case INTERVAL_ALGO_STREAM_FINAL:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
    case INTERVAL_ALGO_STREAM_SEMI:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
    case INTERVAL_ALGO_STREAM_SINGLE:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
993 994 995 996 997 998 999 1000
    case SESSION_ALGO_STREAM_FINAL:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
    case SESSION_ALGO_STREAM_SEMI:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION;
    case SESSION_ALGO_STREAM_SINGLE:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
    case SESSION_ALGO_MERGE:
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
X
Xiaoyu Wang 已提交
1001 1002
    default:
      break;
1003
  }
X
Xiaoyu Wang 已提交
1004
  return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
1005 1006
}

X
Xiaoyu Wang 已提交
1007 1008 1009
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                       SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
1010
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
X
Xiaoyu Wang 已提交
1011 1012 1013
  if (NULL == pInterval) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1014 1015 1016 1017

  pInterval->interval = pWindowLogicNode->interval;
  pInterval->offset = pWindowLogicNode->offset;
  pInterval->sliding = pWindowLogicNode->sliding;
H
Haojun Liao 已提交
1018 1019 1020
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;

X
Xiaoyu Wang 已提交
1021 1022 1023
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
1024 1025 1026
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
1027
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
X
Xiaoyu Wang 已提交
1028 1029
  if (NULL == pSession) {
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1030 1031
  }

X
Xiaoyu Wang 已提交
1032
  pSession->gap = pWindowLogicNode->sessionGap;
X
Xiaoyu Wang 已提交
1033

X
Xiaoyu Wang 已提交
1034
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1035 1036
}

X
Xiaoyu Wang 已提交
1037 1038
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
1039 1040 1041
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
      pCxt, (SLogicNode*)pWindowLogicNode,
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE : QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE));
1042 1043 1044 1045 1046
  if (NULL == pState) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
X
Xiaoyu Wang 已提交
1047 1048
  SNode*     pStateKey = NULL;
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS != code) {
1067
    nodesDestroyNode((SNode*)pState);
1068 1069 1070 1071 1072 1073
    return code;
  }

  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
1074 1075
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
                                     SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1076 1077
  switch (pWindowLogicNode->winType) {
    case WINDOW_TYPE_INTERVAL:
X
Xiaoyu Wang 已提交
1078
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1079
    case WINDOW_TYPE_SESSION:
X
Xiaoyu Wang 已提交
1080
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1081
    case WINDOW_TYPE_STATE:
1082
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1083 1084 1085
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1086
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
1087 1088
}

X
Xiaoyu Wang 已提交
1089 1090
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1091 1092
  SSortPhysiNode* pSort =
      (SSortPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pSortLogicNode, QUERY_NODE_PHYSICAL_PLAN_SORT);
X
Xiaoyu Wang 已提交
1093 1094 1095 1096 1097 1098
  if (NULL == pSort) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pSortKeys = NULL;
X
Xiaoyu Wang 已提交
1099
  int32_t    code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
X
Xiaoyu Wang 已提交
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
X
Xiaoyu Wang 已提交
1112 1113 1114 1115
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortLogicNode->node.pTargets, &pSort->pTargets);
X
Xiaoyu Wang 已提交
1116
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1117
      code = addDataBlockSlots(pCxt, pSort->pTargets, pSort->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1118 1119 1120 1121 1122 1123
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pSort;
  } else {
1124
    nodesDestroyNode((SNode*)pSort);
X
Xiaoyu Wang 已提交
1125 1126 1127 1128 1129
  }

  return code;
}

X
Xiaoyu Wang 已提交
1130 1131
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                        SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1132 1133
  SPartitionPhysiNode* pPart =
      (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
1134 1135 1136 1137 1138 1139
  if (NULL == pPart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pPartitionKeys = NULL;
X
Xiaoyu Wang 已提交
1140
  int32_t    code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
1141 1142 1143 1144 1145 1146

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1147
      code = pushdownDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
1148 1149 1150 1151 1152
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
X
Xiaoyu Wang 已提交
1153 1154 1155 1156
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->node.pTargets, &pPart->pTargets);
1157
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1158
      code = addDataBlockSlots(pCxt, pPart->pTargets, pPart->node.pOutputDataBlockDesc);
1159 1160 1161 1162 1163 1164
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pPart;
  } else {
1165
    nodesDestroyNode((SNode*)pPart);
1166 1167 1168 1169 1170
  }

  return code;
}

X
Xiaoyu Wang 已提交
1171 1172
static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SFillLogicNode* pFillNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1173
  SFillPhysiNode* pFill = (SFillPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFillNode, QUERY_NODE_PHYSICAL_PLAN_FILL);
X
Xiaoyu Wang 已提交
1174 1175 1176 1177
  if (NULL == pFill) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1178 1179 1180
  pFill->mode = pFillNode->mode;
  pFill->timeRange = pFillNode->timeRange;

X
Xiaoyu Wang 已提交
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->node.pTargets, &pFill->pTargets);
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pFill->pTargets, pFill->node.pOutputDataBlockDesc);
  }

  if (TSDB_CODE_SUCCESS == code) {
    pFill->pWStartTs = nodesCloneNode(pFillNode->pWStartTs);
    if (NULL == pFill->pWStartTs) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pFillNode->pValues) {
    pFill->pValues = nodesCloneNode(pFillNode->pValues);
    if (NULL == pFill->pValues) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pFill;
  } else {
1204
    nodesDestroyNode((SNode*)pFill);
X
Xiaoyu Wang 已提交
1205 1206 1207 1208 1209
  }

  return code;
}

X
Xiaoyu Wang 已提交
1210
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
1211
  SExchangePhysiNode* pExchange = (SExchangePhysiNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1212 1213 1214 1215
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pMerge->srcGroupId;
D
dapan1121 已提交
1216
  pExchange->singleChannel = true;
X
Xiaoyu Wang 已提交
1217
  pExchange->node.pParent = (SPhysiNode*)pMerge;
1218
  pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1219
  if (NULL == pExchange->node.pOutputDataBlockDesc) {
1220
    nodesDestroyNode((SNode*)pExchange);
X
Xiaoyu Wang 已提交
1221 1222
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1223 1224
  SNode* pSlot = NULL;
  FOREACH(pSlot, pExchange->node.pOutputDataBlockDesc->pSlots) { ((SSlotDescNode*)pSlot)->output = true; }
1225
  return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
1226 1227 1228
}

static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1229 1230
  SMergePhysiNode* pMerge =
      (SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
X
Xiaoyu Wang 已提交
1231 1232 1233 1234 1235 1236 1237
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
  pMerge->srcGroupId = pMergeLogicNode->srcGroupId;

X
Xiaoyu Wang 已提交
1238
  int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1239

X
Xiaoyu Wang 已提交
1240 1241 1242 1243 1244 1245
  if (TSDB_CODE_SUCCESS == code) {
    for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
      code = createExchangePhysiNodeByMerge(pMerge);
      if (TSDB_CODE_SUCCESS != code) {
        break;
      }
X
Xiaoyu Wang 已提交
1246 1247 1248 1249 1250 1251 1252 1253
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
                         &pMerge->pMergeKeys);
  }

X
Xiaoyu Wang 已提交
1254 1255 1256 1257
  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
                         &pMerge->pTargets);
  }
X
Xiaoyu Wang 已提交
1258 1259 1260
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
  }
X
Xiaoyu Wang 已提交
1261

X
Xiaoyu Wang 已提交
1262 1263 1264
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pMerge;
  } else {
1265
    nodesDestroyNode((SNode*)pMerge);
X
Xiaoyu Wang 已提交
1266 1267 1268 1269 1270
  }

  return code;
}

X
Xiaoyu Wang 已提交
1271 1272
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                                 SNodeList* pChildren, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1273
  switch (nodeType(pLogicNode)) {
X
Xiaoyu Wang 已提交
1274
    case QUERY_NODE_LOGIC_PLAN_SCAN:
X
Xiaoyu Wang 已提交
1275
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1276
    case QUERY_NODE_LOGIC_PLAN_JOIN:
X
Xiaoyu Wang 已提交
1277
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1278
    case QUERY_NODE_LOGIC_PLAN_AGG:
X
Xiaoyu Wang 已提交
1279
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1280
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
X
Xiaoyu Wang 已提交
1281
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1282
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
X
Xiaoyu Wang 已提交
1283
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1284
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
X
Xiaoyu Wang 已提交
1285
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1286 1287
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
1288 1289
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1290 1291
    case QUERY_NODE_LOGIC_PLAN_FILL:
      return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
1292 1293
    case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
      return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1294 1295
    case QUERY_NODE_LOGIC_PLAN_MERGE:
      return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1296 1297 1298
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1299 1300 1301 1302

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
1303 1304
static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                               SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316
  SNodeList* pChildren = nodesMakeList();
  if (NULL == pChildren) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pLogicChild;
  FOREACH(pLogicChild, pLogicNode->pChildren) {
    SPhysiNode* pChild = NULL;
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
    if (TSDB_CODE_SUCCESS == code) {
1317
      code = nodesListStrictAppend(pChildren, (SNode*)pChild);
X
Xiaoyu Wang 已提交
1318 1319 1320 1321 1322
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
X
Xiaoyu Wang 已提交
1323
  }
X
Xiaoyu Wang 已提交
1324

X
Xiaoyu Wang 已提交
1325
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1326 1327 1328 1329 1330 1331 1332
    if (LIST_LENGTH(pChildren) > 0) {
      (*pPhyNode)->pChildren = pChildren;
      SNode* pChild;
      FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
    } else {
      nodesDestroyList(pChildren);
    }
X
Xiaoyu Wang 已提交
1333 1334
  } else {
    nodesDestroyList(pChildren);
X
Xiaoyu Wang 已提交
1335 1336
  }

X
Xiaoyu Wang 已提交
1337
  return code;
X
Xiaoyu Wang 已提交
1338 1339
}

X
Xiaoyu Wang 已提交
1340
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
1341
  SDataInserterNode* pInserter = (SDataInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
X
Xiaoyu Wang 已提交
1342 1343 1344 1345
  if (NULL == pInserter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1346 1347
  pInserter->numOfTables = pBlocks->numOfTables;
  pInserter->size = pBlocks->size;
wafwerar's avatar
wafwerar 已提交
1348
  TSWAP(pInserter->pData, pBlocks->pData);
X
Xiaoyu Wang 已提交
1349 1350 1351

  *pSink = (SDataSinkNode*)pInserter;
  return TSDB_CODE_SUCCESS;
1352 1353
}

X
Xiaoyu Wang 已提交
1354
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
1355
  SDataDispatcherNode* pDispatcher = (SDataDispatcherNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH);
X
Xiaoyu Wang 已提交
1356 1357 1358 1359
  if (NULL == pDispatcher) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1360
  pDispatcher->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1361
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
1362
    nodesDestroyNode((SNode*)pDispatcher);
X
Xiaoyu Wang 已提交
1363 1364 1365 1366 1367
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pSink = (SDataSinkNode*)pDispatcher;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1368 1369
}

X
Xiaoyu Wang 已提交
1370
static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
1371
  SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
X
Xiaoyu Wang 已提交
1372 1373 1374
  if (NULL == pSubplan) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1375
  pSubplan->id = pLogicSubplan->id;
X
Xiaoyu Wang 已提交
1376 1377
  pSubplan->subplanType = pLogicSubplan->subplanType;
  pSubplan->level = pLogicSubplan->level;
X
Xiaoyu Wang 已提交
1378 1379 1380
  return pSubplan;
}

X
Xiaoyu Wang 已提交
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
  pSubplan->msgType = pModify->msgType;
  pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
  SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
  taosArrayPush(pCxt->pExecNodeList, &node);
  return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
}

static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
                                 SDataSinkNode** pSink) {
1391
  SDataDeleterNode* pDeleter = (SDataDeleterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE);
X
Xiaoyu Wang 已提交
1392 1393 1394 1395 1396 1397 1398 1399 1400
  if (NULL == pDeleter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pDeleter->tableId = pModify->tableId;
  pDeleter->tableType = pModify->tableType;
  strcpy(pDeleter->tableFName, pModify->tableFName);
  pDeleter->deleteTimeRange = pModify->deleteTimeRange;

X
Xiaoyu Wang 已提交
1401 1402 1403
  int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
                               &pDeleter->pAffectedRows);
  if (TSDB_CODE_SUCCESS == code) {
1404
    pDeleter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1405 1406 1407 1408 1409 1410 1411 1412
    if (NULL == pDeleter->sink.pInputDataBlockDesc) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pSink = (SDataSinkNode*)pDeleter;
  } else {
1413
    nodesDestroyNode((SNode*)pDeleter);
X
Xiaoyu Wang 已提交
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
  int32_t code =
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
  if (TSDB_CODE_SUCCESS == code) {
    code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
  }
D
dapan1121 已提交
1425
  pSubplan->msgType = TDMT_VND_DELETE;
X
Xiaoyu Wang 已提交
1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445
  return code;
}

static int32_t buildVnodeModifySubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pSubplan) {
  int32_t                code = TSDB_CODE_SUCCESS;
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
  switch (pModify->modifyType) {
    case MODIFY_TABLE_TYPE_INSERT:
      code = buildInsertSubplan(pCxt, pModify, pSubplan);
      break;
    case MODIFY_TABLE_TYPE_DELETE:
      code = buildDeleteSubplan(pCxt, pModify, pSubplan);
      break;
    default:
      code = TSDB_CODE_FAILED;
      break;
  }
  return code;
}

X
Xiaoyu Wang 已提交
1446
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
X
Xiaoyu Wang 已提交
1447
  SSubplan* pSubplan = makeSubplan(pCxt, pLogicSubplan);
X
Xiaoyu Wang 已提交
1448 1449 1450 1451 1452 1453
  if (NULL == pSubplan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

1454
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
X
Xiaoyu Wang 已提交
1455
    code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
1456
  } else {
1457
    pSubplan->msgType = TDMT_VND_QUERY;
X
Xiaoyu Wang 已提交
1458 1459 1460 1461
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
    }
1462
  }
X
Xiaoyu Wang 已提交
1463

X
Xiaoyu Wang 已提交
1464 1465 1466
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiSubplan = pSubplan;
  } else {
1467
    nodesDestroyNode((SNode*)pSubplan);
X
Xiaoyu Wang 已提交
1468
  }
X
Xiaoyu Wang 已提交
1469

X
Xiaoyu Wang 已提交
1470
  return code;
X
Xiaoyu Wang 已提交
1471 1472
}

X
Xiaoyu Wang 已提交
1473
static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
1474
  SQueryPlan* pPlan = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
X
Xiaoyu Wang 已提交
1475 1476 1477 1478 1479
  if (NULL == pPlan) {
    return NULL;
  }
  pPlan->pSubplans = nodesMakeList();
  if (NULL == pPlan->pSubplans) {
1480
    nodesDestroyNode((SNode*)pPlan);
X
Xiaoyu Wang 已提交
1481
    return NULL;
1482
  }
X
Xiaoyu Wang 已提交
1483 1484
  pPlan->queryId = pCxt->pPlanCxt->queryId;
  return pPlan;
1485 1486
}

1487 1488
static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNode* pSubplan, int32_t level, SNodeList* pSubplans) {
  SNodeListNode* pGroup = NULL;
1489
  if (level >= LIST_LENGTH(pSubplans)) {
1490
    pGroup = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
X
bugfix  
Xiaoyu Wang 已提交
1491 1492 1493
    if (NULL == pGroup) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1494
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, (SNode*)pGroup)) {
X
bugfix  
Xiaoyu Wang 已提交
1495 1496
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1497
  } else {
1498
    pGroup = (SNodeListNode*)nodesListGetNode(pSubplans, level);
1499 1500 1501
  }
  if (NULL == pGroup->pNodeList) {
    pGroup->pNodeList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
1502 1503 1504
    if (NULL == pGroup->pNodeList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1505
  }
1506
  return nodesListStrictAppend(pGroup->pNodeList, (SNode*)pSubplan);
1507 1508
}

X
Xiaoyu Wang 已提交
1509 1510
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent,
                              SQueryPlan* pQueryPlan) {
X
Xiaoyu Wang 已提交
1511
  SSubplan* pSubplan = NULL;
X
Xiaoyu Wang 已提交
1512
  int32_t   code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
X
Xiaoyu Wang 已提交
1513

X
Xiaoyu Wang 已提交
1514
  if (TSDB_CODE_SUCCESS == code) {
1515
    code = pushSubplan(pCxt, (SNode*)pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
X
Xiaoyu Wang 已提交
1516
    ++(pQueryPlan->numOfSubplans);
1517 1518
  }

1519
  if (TSDB_CODE_SUCCESS != code) {
1520
    nodesDestroyNode((SNode*)pSubplan);
1521 1522 1523
    return code;
  }

X
Xiaoyu Wang 已提交
1524
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
1525
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pSubplan);
X
Xiaoyu Wang 已提交
1526
    if (TSDB_CODE_SUCCESS == code) {
1527
      code = nodesListMakeAppend(&pSubplan->pParents, (SNode*)pParent);
X
Xiaoyu Wang 已提交
1528 1529
    }
  }
X
Xiaoyu Wang 已提交
1530

X
Xiaoyu Wang 已提交
1531 1532 1533 1534 1535 1536
  if (TSDB_CODE_SUCCESS == code) {
    SNode* pChild = NULL;
    FOREACH(pChild, pLogicSubplan->pChildren) {
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
      if (TSDB_CODE_SUCCESS != code) {
        break;
X
Xiaoyu Wang 已提交
1537 1538 1539
      }
    }
  }
X
Xiaoyu Wang 已提交
1540

X
Xiaoyu Wang 已提交
1541
  return code;
1542 1543
}

X
Xiaoyu Wang 已提交
1544
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
1545
  SQueryPlan* pPlan = (SQueryPlan*)makeQueryPhysiPlan(pCxt);
X
Xiaoyu Wang 已提交
1546 1547
  if (NULL == pPlan) {
    return TSDB_CODE_OUT_OF_MEMORY;
1548
  }
X
Xiaoyu Wang 已提交
1549

X
Xiaoyu Wang 已提交
1550
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1551

X
Xiaoyu Wang 已提交
1552 1553 1554 1555 1556 1557
  SNode* pSubplan = NULL;
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
1558 1559
  }

X
Xiaoyu Wang 已提交
1560 1561 1562
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiPlan = pPlan;
  } else {
1563
    nodesDestroyNode((SNode*)pPlan);
X
Xiaoyu Wang 已提交
1564 1565
  }

X
Xiaoyu Wang 已提交
1566
  return code;
X
Xiaoyu Wang 已提交
1567
}
X
Xiaoyu Wang 已提交
1568

X
Xiaoyu Wang 已提交
1569
static void destoryLocationHash(void* p) {
X
Xiaoyu Wang 已提交
1570
  SHashObj*   pHash = *(SHashObj**)p;
X
bugfix  
Xiaoyu Wang 已提交
1571 1572 1573 1574 1575
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
  while (NULL != pIndex) {
    taosArrayDestroy(pIndex->pSlotIdsInfo);
    pIndex = taosHashIterate(pHash, pIndex);
  }
X
Xiaoyu Wang 已提交
1576 1577 1578 1579 1580 1581 1582
  taosHashCleanup(pHash);
}

static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
}

1583 1584 1585 1586 1587
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
1588
    pPlan->explainInfo.ratio = pStmt->pOptions->ratio;
1589 1590 1591 1592 1593
  } else {
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
  }
}

X
Xiaoyu Wang 已提交
1594
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
X
Xiaoyu Wang 已提交
1595 1596 1597 1598 1599
  SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
                           .errCode = TSDB_CODE_SUCCESS,
                           .nextDataBlockId = 0,
                           .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
                           .pExecNodeList = pExecNodeList};
X
Xiaoyu Wang 已提交
1600 1601 1602
  if (NULL == cxt.pLocationHelper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1603

1604 1605
  if (QUERY_POLICY_VNODE == tsQueryPolicy) {
    taosArrayClear(pExecNodeList);
X
Xiaoyu Wang 已提交
1606
  }
1607 1608

  int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
1609 1610 1611 1612
  if (TSDB_CODE_SUCCESS == code) {
    setExplainInfo(pCxt, *pPlan);
  }

X
Xiaoyu Wang 已提交
1613 1614
  destoryPhysiPlanContext(&cxt);
  return code;
X
Xiaoyu Wang 已提交
1615
}