planPhysiCreater.c 74.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "planInt.h"
X
Xiaoyu Wang 已提交
17

X
Xiaoyu Wang 已提交
18
#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "functionMgt.h"
20
#include "systable.h"
X
Xiaoyu Wang 已提交
21
#include "tglobal.h"
X
Xiaoyu Wang 已提交
22

X
bugfix  
Xiaoyu Wang 已提交
23 24
typedef struct SSlotIdInfo {
  int16_t slotId;
X
Xiaoyu Wang 已提交
25
  bool    set;
X
bugfix  
Xiaoyu Wang 已提交
26 27
} SSlotIdInfo;

X
Xiaoyu Wang 已提交
28 29
typedef struct SSlotIndex {
  int16_t dataBlockId;
X
Xiaoyu Wang 已提交
30
  SArray* pSlotIdsInfo;  // duplicate name slot
X
Xiaoyu Wang 已提交
31 32 33
} SSlotIndex;

typedef struct SPhysiPlanContext {
X
Xiaoyu Wang 已提交
34
  SPlanContext* pPlanCxt;
X
Xiaoyu Wang 已提交
35 36 37
  int32_t       errCode;
  int16_t       nextDataBlockId;
  SArray*       pLocationHelper;
38 39
  bool          hasScan;
  bool          hasSysScan;
X
Xiaoyu Wang 已提交
40 41
} SPhysiPlanContext;

X
Xiaoyu Wang 已提交
42
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
X
Xiaoyu Wang 已提交
43 44
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
    SColumnNode* pCol = (SColumnNode*)pNode;
45 46 47 48 49 50
    if (NULL != pStmtName) {
      if ('\0' != pStmtName[0]) {
        return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
      } else {
        return sprintf(pKey, "%s", pCol->node.aliasName);
      }
X
Xiaoyu Wang 已提交
51
    }
X
Xiaoyu Wang 已提交
52 53 54 55 56
    if ('\0' == pCol->tableAlias[0]) {
      return sprintf(pKey, "%s", pCol->colName);
    }
    return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
  }
X
Xiaoyu Wang 已提交
57

X
Xiaoyu Wang 已提交
58
  if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
59 60
    return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName);
  }
X
Xiaoyu Wang 已提交
61 62 63
  return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}

64 65
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const char* pName, const SNode* pNode, int16_t slotId,
                             bool output, bool reserve) {
X
Xiaoyu Wang 已提交
66
  SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
X
bugfix  
Xiaoyu Wang 已提交
67 68 69
  if (NULL == pSlot) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
70
  snprintf(pSlot->name, sizeof(pSlot->name), "%s", pName);
X
Xiaoyu Wang 已提交
71 72
  pSlot->slotId = slotId;
  pSlot->dataType = ((SExprNode*)pNode)->resType;
X
Xiaoyu Wang 已提交
73
  pSlot->reserve = reserve;
X
bugfix  
Xiaoyu Wang 已提交
74
  pSlot->output = output;
X
Xiaoyu Wang 已提交
75 76 77
  return (SNode*)pSlot;
}

X
bugfix  
Xiaoyu Wang 已提交
78
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
X
Xiaoyu Wang 已提交
79 80
  STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
  if (NULL == pTarget) {
X
bugfix  
Xiaoyu Wang 已提交
81
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
82
  }
X
bugfix  
Xiaoyu Wang 已提交
83

X
Xiaoyu Wang 已提交
84 85 86
  pTarget->dataBlockId = dataBlockId;
  pTarget->slotId = slotId;
  pTarget->pExpr = pNode;
X
bugfix  
Xiaoyu Wang 已提交
87 88 89

  *pOutput = (SNode*)pTarget;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
90 91
}

X
bugfix  
Xiaoyu Wang 已提交
92
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
93 94
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
  if (NULL != pIndex) {
X
Xiaoyu Wang 已提交
95
    SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
96 97 98 99
    taosArrayPush(pIndex->pSlotIdsInfo, &info);
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
100
  SSlotIndex index = {.dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo))};
X
bugfix  
Xiaoyu Wang 已提交
101 102 103
  if (NULL == index.pSlotIdsInfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
104
  SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
105
  taosArrayPush(index.pSlotIdsInfo, &info);
X
bugfix  
Xiaoyu Wang 已提交
106 107 108
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}

109 110
static int32_t putSlotToHash(const char* pName, int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
  return putSlotToHashImpl(dataBlockId, slotId, pName, strlen(pName), pHash);
X
bugfix  
Xiaoyu Wang 已提交
111 112
}

X
Xiaoyu Wang 已提交
113 114
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
                                       SHashObj** pDescHash) {
X
bugfix  
Xiaoyu Wang 已提交
115
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
bugfix  
Xiaoyu Wang 已提交
116 117 118 119 120 121 122 123 124 125 126 127
  if (NULL == pHash) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
    taosHashCleanup(pHash);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pDescHash = pHash;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
128 129
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
                                   SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
130
  pDataBlockDesc->pSlots = nodesMakeList();
X
Xiaoyu Wang 已提交
131
  if (NULL == pDataBlockDesc->pSlots) {
X
bugfix  
Xiaoyu Wang 已提交
132 133
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
134

X
bugfix  
Xiaoyu Wang 已提交
135 136
  int32_t code = TSDB_CODE_SUCCESS;
  int16_t slotId = 0;
X
Xiaoyu Wang 已提交
137
  SNode*  pNode = NULL;
X
bugfix  
Xiaoyu Wang 已提交
138
  FOREACH(pNode, pList) {
139 140 141
    char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    getSlotKey(pNode, NULL, name);
    code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pNode, slotId, true, false));
X
bugfix  
Xiaoyu Wang 已提交
142
    if (TSDB_CODE_SUCCESS == code) {
143
      code = putSlotToHash(name, pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
X
bugfix  
Xiaoyu Wang 已提交
144 145
    }
    if (TSDB_CODE_SUCCESS == code) {
146 147
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
X
bugfix  
Xiaoyu Wang 已提交
148 149 150
      ++slotId;
    } else {
      break;
X
Xiaoyu Wang 已提交
151
    }
X
bugfix  
Xiaoyu Wang 已提交
152 153 154 155 156
  }
  return code;
}

static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
157
  SDataBlockDescNode* pDesc = (SDataBlockDescNode*)nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
X
bugfix  
Xiaoyu Wang 已提交
158 159 160 161 162 163
  if (NULL == pDesc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pDesc->dataBlockId = pCxt->nextDataBlockId++;

  SHashObj* pHash = NULL;
X
Xiaoyu Wang 已提交
164
  int32_t   code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash);
X
bugfix  
Xiaoyu Wang 已提交
165 166 167 168 169 170
  if (TSDB_CODE_SUCCESS == code) {
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pDataBlockDesc = pDesc;
X
Xiaoyu Wang 已提交
171
  } else {
172
    nodesDestroyNode((SNode*)pDesc);
X
Xiaoyu Wang 已提交
173
  }
X
bugfix  
Xiaoyu Wang 已提交
174 175 176 177

  return code;
}

X
bugfix  
Xiaoyu Wang 已提交
178 179 180 181 182 183 184 185 186 187 188 189
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
  for (int32_t i = 0; i < size; ++i) {
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
    if (!pInfo->set) {
      pInfo->set = true;
      return pInfo->slotId;
    }
  }
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
}

X
Xiaoyu Wang 已提交
190
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
X
Xiaoyu Wang 已提交
191
                                     const char* pStmtName, bool output, bool reserve) {
192 193 194 195
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
196
  int32_t   code = TSDB_CODE_SUCCESS;
X
bugfix  
Xiaoyu Wang 已提交
197
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
X
Xiaoyu Wang 已提交
198
  int16_t   nextSlotId = LIST_LENGTH(pDataBlockDesc->pSlots), slotId = 0;
X
Xiaoyu Wang 已提交
199
  SNode*    pNode = NULL;
X
Xiaoyu Wang 已提交
200
  FOREACH(pNode, pList) {
X
Xiaoyu Wang 已提交
201 202 203
    SNode*      pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
    char        name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    int32_t     len = getSlotKey(pExpr, pStmtName, name);
X
bugfix  
Xiaoyu Wang 已提交
204 205
    SSlotIndex* pIndex = taosHashGet(pHash, name, len);
    if (NULL == pIndex) {
206 207
      code =
          nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pExpr, nextSlotId, output, reserve));
X
bugfix  
Xiaoyu Wang 已提交
208 209 210
      if (TSDB_CODE_SUCCESS == code) {
        code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
      }
211 212 213 214
      pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
      if (output) {
        pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
      }
X
bugfix  
Xiaoyu Wang 已提交
215 216 217
      slotId = nextSlotId;
      ++nextSlotId;
    } else {
X
bugfix  
Xiaoyu Wang 已提交
218
      slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
X
bugfix  
Xiaoyu Wang 已提交
219
    }
220

X
bugfix  
Xiaoyu Wang 已提交
221 222 223 224 225 226 227
    if (TSDB_CODE_SUCCESS == code) {
      SNode* pTarget = NULL;
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pTarget);
      }
    }
X
Xiaoyu Wang 已提交
228

X
bugfix  
Xiaoyu Wang 已提交
229 230 231
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
232
  }
X
bugfix  
Xiaoyu Wang 已提交
233 234 235 236
  return code;
}

static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
237
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, false);
X
Xiaoyu Wang 已提交
238 239
}

240 241 242 243 244 245
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
  if (NULL == pNode || NULL == *pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
246
  int32_t    code = nodesListMakeAppend(&pList, *pNode);
247 248 249 250 251 252 253 254 255 256
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pNode = nodesListGetNode(pList, 0);
  }
  nodesClearList(pList);
  return code;
}

X
Xiaoyu Wang 已提交
257 258
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
                                           SDataBlockDescNode* pDataBlockDesc) {
259
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, false);
X
bugfix  
Xiaoyu Wang 已提交
260 261 262
}

static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
263
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true, true);
X
Xiaoyu Wang 已提交
264 265 266
}

typedef struct SSetSlotIdCxt {
X
Xiaoyu Wang 已提交
267
  int32_t   errCode;
X
Xiaoyu Wang 已提交
268 269 270 271
  SHashObj* pLeftHash;
  SHashObj* pRightHash;
} SSetSlotIdCxt;

272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
static void dumpSlots(const char* pName, SHashObj* pHash) {
  if (NULL == pHash) {
    return;
  }
  planDebug("%s", pName);
  void* pIt = taosHashIterate(pHash, NULL);
  while (NULL != pIt) {
    size_t len = 0;
    char*  pKey = taosHashGetKey(pIt, &len);
    char   name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    strncpy(name, pKey, len);
    planDebug("\tslot name = %s", name);
    pIt = taosHashIterate(pHash, pIt);
  }
}

X
Xiaoyu Wang 已提交
288 289 290
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
X
Xiaoyu Wang 已提交
291 292 293
    char           name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    int32_t        len = getSlotKey(pNode, NULL, name);
    SSlotIndex*    pIndex = taosHashGet(pCxt->pLeftHash, name, len);
X
Xiaoyu Wang 已提交
294 295 296 297
    if (NULL == pIndex) {
      pIndex = taosHashGet(pCxt->pRightHash, name, len);
    }
    // pIndex is definitely not NULL, otherwise it is a bug
X
bugfix  
Xiaoyu Wang 已提交
298
    if (NULL == pIndex) {
299
      planError("doSetSlotId failed, invalid slot name %s", name);
300 301
      dumpSlots("left datablock desc", pCxt->pLeftHash);
      dumpSlots("right datablock desc", pCxt->pRightHash);
302
      pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
X
bugfix  
Xiaoyu Wang 已提交
303 304
      return DEAL_RES_ERROR;
    }
X
Xiaoyu Wang 已提交
305
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
X
bugfix  
Xiaoyu Wang 已提交
306
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
X
Xiaoyu Wang 已提交
307 308 309 310 311
    return DEAL_RES_IGNORE_CHILD;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
312 313
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
                             SNode** pOutput) {
314 315 316 317
  if (NULL == pNode) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
318
  SNode* pRes = nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
319 320 321 322 323
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
324 325 326
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
327
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
328 329
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyNode(pRes);
X
Xiaoyu Wang 已提交
330
    return cxt.errCode;
X
Xiaoyu Wang 已提交
331
  }
X
Xiaoyu Wang 已提交
332 333 334

  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
335 336
}

X
Xiaoyu Wang 已提交
337 338
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
                             const SNodeList* pList, SNodeList** pOutput) {
339 340 341 342
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
343
  SNodeList* pRes = nodesCloneList(pList);
X
Xiaoyu Wang 已提交
344 345 346 347 348
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
349 350 351
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
352
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
353 354
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyList(pRes);
X
Xiaoyu Wang 已提交
355
    return cxt.errCode;
X
Xiaoyu Wang 已提交
356
  }
X
Xiaoyu Wang 已提交
357 358
  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
359 360
}

X
Xiaoyu Wang 已提交
361
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
X
Xiaoyu Wang 已提交
362
  SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
X
Xiaoyu Wang 已提交
363 364 365
  if (NULL == pPhysiNode) {
    return NULL;
  }
X
bugfix  
Xiaoyu Wang 已提交
366

367 368
  TSWAP(pPhysiNode->pLimit, pLogicNode->pLimit);
  TSWAP(pPhysiNode->pSlimit, pLogicNode->pSlimit);
369 370
  pPhysiNode->inputTsOrder = pLogicNode->inputTsOrder;
  pPhysiNode->outputTsOrder = pLogicNode->outputTsOrder;
371

X
bugfix  
Xiaoyu Wang 已提交
372 373
  int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
  if (TSDB_CODE_SUCCESS != code) {
374
    nodesDestroyNode((SNode*)pPhysiNode);
X
Xiaoyu Wang 已提交
375 376
    return NULL;
  }
X
Xiaoyu Wang 已提交
377
  pPhysiNode->pOutputDataBlockDesc->precision = pLogicNode->precision;
X
Xiaoyu Wang 已提交
378 379 380 381 382
  return pPhysiNode;
}

static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
  if (NULL != pLogicNode->pConditions) {
X
Xiaoyu Wang 已提交
383 384
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions,
                         &pPhysiNode->pConditions);
X
Xiaoyu Wang 已提交
385 386 387 388
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
389 390 391 392 393 394 395 396 397 398 399 400 401
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
}

static int32_t sortScanCols(SNodeList* pScanCols) {
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
  if (NULL == pArray) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNode* pCol = NULL;
X
Xiaoyu Wang 已提交
402
  FOREACH(pCol, pScanCols) { taosArrayPush(pArray, &pCol); }
X
Xiaoyu Wang 已提交
403 404 405
  taosArraySort(pArray, colIdCompare);

  int32_t index = 0;
X
Xiaoyu Wang 已提交
406
  FOREACH(pCol, pScanCols) { REPLACE_NODE(taosArrayGetP(pArray, index++)); }
X
Xiaoyu Wang 已提交
407 408 409 410 411
  taosArrayDestroy(pArray);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
412
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
413 414 415 416
  if (NULL == pScanCols) {
    return TSDB_CODE_SUCCESS;
  }

417 418 419
  pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
  if (NULL == pScanPhysiNode->pScanCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
420
  }
X
bugfix  
Xiaoyu Wang 已提交
421
  return sortScanCols(pScanPhysiNode->pScanCols);
422 423
}

X
Xiaoyu Wang 已提交
424
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
X
Xiaoyu Wang 已提交
425
                                           SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
426 427
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
428
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
429
  }
430 431 432 433 434 435 436 437 438 439 440 441

  if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
    pScanPhysiNode->pScanPseudoCols = nodesCloneList(pScanLogicNode->pScanPseudoCols);
    if (NULL == pScanPhysiNode->pScanPseudoCols) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
  }

X
Xiaoyu Wang 已提交
442 443 444
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
  }
445

X
Xiaoyu Wang 已提交
446
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
447
    pScanPhysiNode->uid = pScanLogicNode->tableId;
X
Xiaoyu Wang 已提交
448
    pScanPhysiNode->suid = pScanLogicNode->stableId;
X
Xiaoyu Wang 已提交
449
    pScanPhysiNode->tableType = pScanLogicNode->tableType;
X
Xiaoyu Wang 已提交
450
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
X
Xiaoyu Wang 已提交
451 452 453 454 455 456
    if (NULL != pScanLogicNode->pTagCond) {
      pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond);
      if (NULL == pSubplan->pTagCond) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
X
Xiaoyu Wang 已提交
457
  }
X
Xiaoyu Wang 已提交
458

X
Xiaoyu Wang 已提交
459 460 461 462 463 464 465 466 467
  if (TSDB_CODE_SUCCESS == code) {
    if (NULL != pScanLogicNode->pTagIndexCond) {
      pSubplan->pTagIndexCond = nodesCloneNode(pScanLogicNode->pTagIndexCond);
      if (NULL == pSubplan->pTagIndexCond) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
  }

X
Xiaoyu Wang 已提交
468 469 470
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
  } else {
471
    nodesDestroyNode((SNode*)pScanPhysiNode);
X
Xiaoyu Wang 已提交
472
  }
X
Xiaoyu Wang 已提交
473

X
Xiaoyu Wang 已提交
474
  return code;
X
Xiaoyu Wang 已提交
475 476
}

477 478
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
479
  pNodeAddr->epSet = vg->epSet;
480 481
}

X
Xiaoyu Wang 已提交
482 483
static ENodeType getScanOperatorType(EScanType scanType) {
  switch (scanType) {
484 485
    case SCAN_TYPE_TAG:
      return QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
X
Xiaoyu Wang 已提交
486 487 488 489 490
    case SCAN_TYPE_TABLE:
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
    case SCAN_TYPE_STREAM:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
    case SCAN_TYPE_TABLE_MERGE:
S
slzhou 已提交
491
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
492 493
    case SCAN_TYPE_BLOCK_INFO:
      return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
494 495
    case SCAN_TYPE_TABLE_COUNT:
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN;
X
Xiaoyu Wang 已提交
496 497 498 499 500 501
    default:
      break;
  }
  return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
}

502 503 504 505 506 507 508 509 510 511 512
static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                         SPhysiNode** pPhyNode) {
  SScanPhysiNode* pScan =
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, getScanOperatorType(pScanLogicNode->scanType));
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
}

X
Xiaoyu Wang 已提交
513 514 515 516 517 518 519 520 521 522 523 524 525
static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                          SPhysiNode** pPhyNode) {
  SLastRowScanPhysiNode* pScan =
      (SLastRowScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN);
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
  if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
    nodesDestroyNode((SNode*)pScan);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
526

X
Xiaoyu Wang 已提交
527
  pScan->groupSort = pScanLogicNode->groupSort;
X
Xiaoyu Wang 已提交
528
  pScan->ignoreNull = pScanLogicNode->igLastNull;
529
  vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
X
Xiaoyu Wang 已提交
530

X
Xiaoyu Wang 已提交
531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
}

static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
  STableCountScanPhysiNode* pScan = (STableCountScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
                                                                             QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN);
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
  if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
    nodesDestroyNode((SNode*)pScan);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pScan->groupSort = pScanLogicNode->groupSort;
549 550
  vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);

X
Xiaoyu Wang 已提交
551 552 553
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
}

X
Xiaoyu Wang 已提交
554 555
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                        SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
556 557
  STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
                                                                        getScanOperatorType(pScanLogicNode->scanType));
X
Xiaoyu Wang 已提交
558 559 560 561
  if (NULL == pTableScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
562
  memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
X
Xiaoyu Wang 已提交
563
  pTableScan->scanRange = pScanLogicNode->scanRange;
564
  pTableScan->ratio = pScanLogicNode->ratio;
5
54liuyao 已提交
565 566 567 568
  if (pScanLogicNode->pVgroupList) {
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
  }
D
dapan1121 已提交
569
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
570 571
  pTableScan->dataRequired = pScanLogicNode->dataRequired;
  pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
572
  pTableScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
X
Xiaoyu Wang 已提交
573
  if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
574
      (NULL != pScanLogicNode->pGroupTags && NULL == pTableScan->pGroupTags)) {
575
    nodesDestroyNode((SNode*)pTableScan);
576 577
    return TSDB_CODE_OUT_OF_MEMORY;
  }
578
  pTableScan->groupSort = pScanLogicNode->groupSort;
X
Xiaoyu Wang 已提交
579 580 581 582 583
  pTableScan->interval = pScanLogicNode->interval;
  pTableScan->offset = pScanLogicNode->offset;
  pTableScan->sliding = pScanLogicNode->sliding;
  pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
  pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
5
54liuyao 已提交
584 585
  pTableScan->triggerType = pScanLogicNode->triggerType;
  pTableScan->watermark = pScanLogicNode->watermark;
586
  pTableScan->igExpired = pScanLogicNode->igExpired;
587
  pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
X
Xiaoyu Wang 已提交
588
  pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
X
Xiaoyu Wang 已提交
589

590 591 592 593 594 595 596 597 598 599
  int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pTableScan->scan.node.pOutputDataBlockDesc->dataBlockId, -1, pScanLogicNode->pTags,
                         &pTableScan->pTags);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pTableScan->scan.node.pOutputDataBlockDesc->dataBlockId, -1, pScanLogicNode->pSubtable,
                         &pTableScan->pSubtable);
  }
  return code;
X
Xiaoyu Wang 已提交
600 601
}

X
Xiaoyu Wang 已提交
602 603
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                              SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
604 605
  SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
                                                                               QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
X
Xiaoyu Wang 已提交
606 607 608 609
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

610
  pSubplan->showRewrite = pScanLogicNode->showRewrite;
D
dapan1121 已提交
611 612
  pScan->showRewrite = pScanLogicNode->showRewrite;
  pScan->accountId = pCxt->pPlanCxt->acctId;
613
  pScan->sysInfo = pCxt->pPlanCxt->sysInfo;
D
dapan1121 已提交
614
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) ||
615 616
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) ||
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS)) {
X
Xiaoyu Wang 已提交
617
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
D
dapan1121 已提交
618 619 620
  } else {
    pSubplan->execNode.nodeId = MNODE_HANDLE;
    pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
X
Xiaoyu Wang 已提交
621
  }
622 623 624 625 626
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES)) {
    pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet;
  } else {
    pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
  }
D
dapan1121 已提交
627
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
X
Xiaoyu Wang 已提交
628

629
  pCxt->hasSysScan = true;
X
Xiaoyu Wang 已提交
630
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
X
Xiaoyu Wang 已提交
631 632
}

X
Xiaoyu Wang 已提交
633 634
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                         SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
635 636 637 638 639 640
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
}

static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
641 642
}

X
Xiaoyu Wang 已提交
643 644
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                   SPhysiNode** pPhyNode) {
645
  pCxt->hasScan = true;
X
Xiaoyu Wang 已提交
646 647
  switch (pScanLogicNode->scanType) {
    case SCAN_TYPE_TAG:
648 649
    case SCAN_TYPE_BLOCK_INFO:
      return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
650 651
    case SCAN_TYPE_TABLE_COUNT:
      return createTableCountScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
652 653
    case SCAN_TYPE_LAST_ROW:
      return createLastRowScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
654
    case SCAN_TYPE_TABLE:
X
Xiaoyu Wang 已提交
655
      return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
656
    case SCAN_TYPE_SYSTEM_TABLE:
X
Xiaoyu Wang 已提交
657
      return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
658
    case SCAN_TYPE_STREAM:
X
Xiaoyu Wang 已提交
659
      return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
660 661
    case SCAN_TYPE_TABLE_MERGE:
      return createTableMergeScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
662 663 664
    default:
      break;
  }
X
Xiaoyu Wang 已提交
665
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
666 667
}

X
Xiaoyu Wang 已提交
668 669
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
                                   SPhysiNode** pPhyNode) {
670 671
  SSortMergeJoinPhysiNode* pJoin =
      (SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
X
Xiaoyu Wang 已提交
672 673 674
  if (NULL == pJoin) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
675

676 677
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
X
Xiaoyu Wang 已提交
678
  int32_t             code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
679

680
  pJoin->joinType = pJoinLogicNode->joinType;
681
  pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
682 683
  setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition,
                &pJoin->pMergeCondition);
X
Xiaoyu Wang 已提交
684
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
685 686
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
                         &pJoin->pTargets);
X
Xiaoyu Wang 已提交
687 688
  }
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
689
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
690
  }
691 692

  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) {
X
Xiaoyu Wang 已提交
693 694 695 696 697 698 699 700 701 702
    SNodeList* pCondCols = nodesMakeList();
    if (NULL == pCondCols) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      code = nodesCollectColumnsFromNode(pJoinLogicNode->pOnConditions, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
    }
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pCondCols, pJoin->node.pOutputDataBlockDesc);
    }
    nodesDestroyList(pCondCols);
703 704 705
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) {
706 707
    code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
                         pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
708 709
  }

S
slzhou 已提交
710 711
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqualOnConditions) {
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqualOnConditions, &pJoin->pColEqualOnConditions);
712
  }
X
Xiaoyu Wang 已提交
713 714 715
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
  }
X
Xiaoyu Wang 已提交
716

X
Xiaoyu Wang 已提交
717 718 719
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pJoin;
  } else {
720
    nodesDestroyNode((SNode*)pJoin);
X
Xiaoyu Wang 已提交
721
  }
X
Xiaoyu Wang 已提交
722

X
Xiaoyu Wang 已提交
723
  return code;
X
Xiaoyu Wang 已提交
724 725 726
}

typedef struct SRewritePrecalcExprsCxt {
X
Xiaoyu Wang 已提交
727 728 729
  int32_t    errCode;
  int32_t    planNodeId;
  int32_t    rewriteId;
X
Xiaoyu Wang 已提交
730 731 732 733 734
  SNodeList* pPrecalcExprs;
} SRewritePrecalcExprsCxt;

static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SNode* pExpr = nodesCloneNode(*pNode);
X
bugfix  
Xiaoyu Wang 已提交
735
  if (NULL == pExpr) {
736
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
bugfix  
Xiaoyu Wang 已提交
737 738
    return DEAL_RES_ERROR;
  }
X
Xiaoyu Wang 已提交
739
  if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
740
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
741 742 743 744 745
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
  if (NULL == pCol) {
746
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
747 748 749 750 751 752 753 754
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
  pCol->node.resType = pRewrittenExpr->resType;
  if ('\0' != pRewrittenExpr->aliasName[0]) {
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  } else {
X
Xiaoyu Wang 已提交
755 756
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId,
             pCxt->rewriteId);
X
Xiaoyu Wang 已提交
757 758 759 760 761 762 763
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  }
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)pCol;
  return DEAL_RES_IGNORE_CHILD;
}

764 765 766 767 768
static int32_t rewriteValueToOperator(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SOperatorNode* pOper = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
  if (NULL == pOper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
769 770
  pOper->pLeft = nodesMakeNode(QUERY_NODE_LEFT_VALUE);
  if (NULL == pOper->pLeft) {
771
    nodesDestroyNode((SNode*)pOper);
772 773
    return TSDB_CODE_OUT_OF_MEMORY;
  }
774 775 776 777
  SValueNode* pVal = (SValueNode*)*pNode;
  pOper->node.resType = pVal->node.resType;
  strcpy(pOper->node.aliasName, pVal->node.aliasName);
  pOper->opType = OP_TYPE_ASSIGN;
778
  pOper->pRight = *pNode;
779 780 781 782
  *pNode = (SNode*)pOper;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
783 784 785
static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
  switch (nodeType(*pNode)) {
786
    case QUERY_NODE_VALUE: {
787 788 789
      if (((SValueNode*)*pNode)->notReserved) {
        break;
      }
790 791
      pCxt->errCode = rewriteValueToOperator(pCxt, pNode);
      if (TSDB_CODE_SUCCESS != pCxt->errCode) {
792 793 794 795
        return DEAL_RES_ERROR;
      }
      return collectAndRewrite(pCxt, pNode);
    }
X
Xiaoyu Wang 已提交
796
    case QUERY_NODE_OPERATOR:
D
dapan1121 已提交
797 798
    case QUERY_NODE_LOGIC_CONDITION:
    case QUERY_NODE_CASE_WHEN: {
799
      return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
800 801
    }
    case QUERY_NODE_FUNCTION: {
802
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
803
        return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
804 805 806 807 808 809 810 811
      }
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
812 813
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs,
                                   SNodeList** pRewrittenList) {
X
Xiaoyu Wang 已提交
814 815 816 817 818 819
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == *pPrecalcExprs) {
    *pPrecalcExprs = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
820 821 822
    if (NULL == *pPrecalcExprs) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
823 824 825
  }
  if (NULL == *pRewrittenList) {
    *pRewrittenList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
826 827 828
    if (NULL == *pRewrittenList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
829 830 831 832 833 834 835 836 837
  }
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
    SNode* pNew = NULL;
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
      pNew = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0));
    } else {
      pNew = nodesCloneNode(pNode);
    }
X
bugfix  
Xiaoyu Wang 已提交
838 839 840 841 842 843
    if (NULL == pNew) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
844
  }
X
Xiaoyu Wang 已提交
845
  SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs};
X
Xiaoyu Wang 已提交
846
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
847
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs) || TSDB_CODE_SUCCESS != cxt.errCode) {
X
Xiaoyu Wang 已提交
848
    NODES_DESTORY_LIST(*pPrecalcExprs);
X
Xiaoyu Wang 已提交
849 850 851 852
  }
  return cxt.errCode;
}

X
Xiaoyu Wang 已提交
853 854
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs,
                                  SNode** pRewritten) {
855 856 857 858 859
  if (NULL == pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
860
  int32_t    code = nodesListMakeAppend(&pList, pNode);
861 862 863 864 865 866 867 868 869 870 871 872
  SNodeList* pRewrittenList = NULL;
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
  }
  nodesClearList(pList);
  nodesClearList(pRewrittenList);
  return code;
}

X
Xiaoyu Wang 已提交
873 874
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
                                  SPhysiNode** pPhyNode) {
875 876
  SAggPhysiNode* pAgg =
      (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
X
Xiaoyu Wang 已提交
877 878 879
  if (NULL == pAgg) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
880

881
  pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
882
  pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized;
883

X
Xiaoyu Wang 已提交
884 885 886
  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pGroupKeys = NULL;
  SNodeList* pAggFuncs = NULL;
X
Xiaoyu Wang 已提交
887
  int32_t    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
X
Xiaoyu Wang 已提交
888 889 890
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
  }
X
Xiaoyu Wang 已提交
891

X
Xiaoyu Wang 已提交
892 893
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
X
Xiaoyu Wang 已提交
894 895 896
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
897
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
898
    }
X
Xiaoyu Wang 已提交
899 900
  }

X
Xiaoyu Wang 已提交
901 902 903
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
904
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
905
    }
X
Xiaoyu Wang 已提交
906 907
  }

X
Xiaoyu Wang 已提交
908 909 910
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
911
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
912
    }
X
Xiaoyu Wang 已提交
913 914
  }

X
Xiaoyu Wang 已提交
915 916 917
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
  }
X
Xiaoyu Wang 已提交
918

X
Xiaoyu Wang 已提交
919 920 921
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pAgg;
  } else {
922
    nodesDestroyNode((SNode*)pAgg);
X
Xiaoyu Wang 已提交
923
  }
X
Xiaoyu Wang 已提交
924

X
bugfix  
Xiaoyu Wang 已提交
925 926 927 928
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pGroupKeys);
  nodesDestroyList(pAggFuncs);

X
Xiaoyu Wang 已提交
929
  return code;
X
Xiaoyu Wang 已提交
930 931
}

932 933 934
static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SIndefRowsFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
  SIndefRowsFuncPhysiNode* pIdfRowsFunc = (SIndefRowsFuncPhysiNode*)makePhysiNode(
X
Xiaoyu Wang 已提交
935
      pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC);
936 937 938 939 940
  if (NULL == pIdfRowsFunc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
X
Xiaoyu Wang 已提交
941 942
  SNodeList* pFuncs = NULL;
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
943

944 945 946 947 948
  if (pIdfRowsFunc->node.inputTsOrder == 0) {
    // default to asc
    pIdfRowsFunc->node.inputTsOrder = TSDB_ORDER_ASC;
  }

949 950 951 952 953 954 955 956 957
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pIdfRowsFunc->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = pushdownDataBlockSlots(pCxt, pIdfRowsFunc->pExprs, pChildTupe);
    }
  }

X
Xiaoyu Wang 已提交
958 959
  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pIdfRowsFunc->pFuncs);
960
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
961
      code = addDataBlockSlots(pCxt, pIdfRowsFunc->pFuncs, pIdfRowsFunc->node.pOutputDataBlockDesc);
962 963 964
    }
  }

965 966 967 968
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pIdfRowsFunc);
  }

969 970 971
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pIdfRowsFunc;
  } else {
972
    nodesDestroyNode((SNode*)pIdfRowsFunc);
973 974
  }

X
Xiaoyu Wang 已提交
975 976 977
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pFuncs);

978 979 980
  return code;
}

X
Xiaoyu Wang 已提交
981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                         SInterpFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
  SInterpFuncPhysiNode* pInterpFunc =
      (SInterpFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC);
  if (NULL == pInterpFunc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pFuncs = NULL;
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pInterpFunc->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = pushdownDataBlockSlots(pCxt, pInterpFunc->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pInterpFunc->pFuncs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pInterpFunc->pFuncs, pInterpFunc->node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    pInterpFunc->timeRange = pFuncLogicNode->timeRange;
    pInterpFunc->interval = pFuncLogicNode->interval;
X
Xiaoyu Wang 已提交
1012 1013
    pInterpFunc->fillMode = pFuncLogicNode->fillMode;
    pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues);
X
Xiaoyu Wang 已提交
1014
    if (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues) {
X
Xiaoyu Wang 已提交
1015 1016
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
1017 1018
  }

X
Xiaoyu Wang 已提交
1019 1020 1021 1022
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncLogicNode->pTimeSeries, &pInterpFunc->pTimeSeries);
  }

1023 1024 1025 1026
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pInterpFunc);
  }

X
Xiaoyu Wang 已提交
1027 1028 1029 1030 1031 1032
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pInterpFunc;
  } else {
    nodesDestroyNode((SNode*)pInterpFunc);
  }

X
Xiaoyu Wang 已提交
1033 1034 1035
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pFuncs);

X
Xiaoyu Wang 已提交
1036 1037 1038
  return code;
}

1039
static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) {
1040 1041 1042
  if (GROUP_ACTION_KEEP == pProject->node.groupAction) {
    return false;
  }
1043 1044 1045 1046
  if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) {
    return true;
  }
  if (1 != LIST_LENGTH(pProject->node.pChildren)) {
1047
    return true;
1048 1049 1050 1051 1052
  }
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
  return DATA_ORDER_LEVEL_GLOBAL == pChild->resultDataOrder ? true : false;
}

X
Xiaoyu Wang 已提交
1053 1054
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                      SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1055 1056
  SProjectPhysiNode* pProject =
      (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
X
Xiaoyu Wang 已提交
1057 1058 1059
  if (NULL == pProject) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1060

1061
  pProject->mergeDataBlock = projectCanMergeDataBlock(pProjectLogicNode);
1062
  pProject->ignoreGroupId = pProjectLogicNode->ignoreGroupId;
1063

1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
  int32_t code = TSDB_CODE_SUCCESS;
  if (0 == LIST_LENGTH(pChildren)) {
    pProject->pProjections = nodesCloneList(pProjectLogicNode->pProjections);
    if (NULL == pProject->pProjections) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  } else {
    code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1,
                         pProjectLogicNode->pProjections, &pProject->pProjections);
  }
X
Xiaoyu Wang 已提交
1074
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1075 1076
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections,
                                       pProject->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1077 1078 1079 1080
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
  }
X
Xiaoyu Wang 已提交
1081

X
Xiaoyu Wang 已提交
1082 1083 1084
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pProject;
  } else {
1085
    nodesDestroyNode((SNode*)pProject);
X
Xiaoyu Wang 已提交
1086
  }
X
Xiaoyu Wang 已提交
1087

X
Xiaoyu Wang 已提交
1088
  return code;
X
Xiaoyu Wang 已提交
1089 1090
}

X
Xiaoyu Wang 已提交
1091 1092
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                         SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1093 1094
  SExchangePhysiNode* pExchange =
      (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1095 1096 1097
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1098

1099 1100
  pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
  pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
X
Xiaoyu Wang 已提交
1101
  pExchange->seqRecvData = pExchangeLogicNode->seqRecvData;
X
Xiaoyu Wang 已提交
1102

1103 1104 1105 1106 1107 1108 1109 1110
  int32_t code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pExchange);
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pExchange;
  } else {
    nodesDestroyNode((SNode*)pExchange);
  }

  return code;
X
Xiaoyu Wang 已提交
1111
}
X
Xiaoyu Wang 已提交
1112

X
Xiaoyu Wang 已提交
1113 1114
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1115 1116
  SScanPhysiNode* pScan =
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
X
Xiaoyu Wang 已提交
1117 1118 1119
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1120

X
bugfix  
Xiaoyu Wang 已提交
1121 1122 1123 1124 1125 1126
  int32_t code = TSDB_CODE_SUCCESS;

  pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
  if (NULL == pScan->pScanCols) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
L
fix  
Liu Jicong 已提交
1127 1128 1129 1130 1131

  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }

1132 1133 1134
  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }
X
Xiaoyu Wang 已提交
1135
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
1136
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1137
  }
1138 1139 1140
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pScan);
  }
X
Xiaoyu Wang 已提交
1141 1142 1143 1144

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScan;
  } else {
1145
    nodesDestroyNode((SNode*)pScan);
X
Xiaoyu Wang 已提交
1146 1147 1148 1149 1150
  }

  return code;
}

X
Xiaoyu Wang 已提交
1151 1152
static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                       SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1153 1154
  if (pCxt->pPlanCxt->streamQuery) {
    return createStreamScanPhysiNodeByExchange(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1155
  } else {
X
Xiaoyu Wang 已提交
1156
    return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1157
  }
X
Xiaoyu Wang 已提交
1158 1159
}

1160
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowPhysiNode* pWindow,
1161 1162 1163
                                             SWindowLogicNode* pWindowLogicNode) {
  pWindow->triggerType = pWindowLogicNode->triggerType;
  pWindow->watermark = pWindowLogicNode->watermark;
1164
  pWindow->deleteMark = pWindowLogicNode->deleteMark;
1165
  pWindow->igExpired = pWindowLogicNode->igExpired;
1166
  pWindow->mergeDataBlock = (GROUP_ACTION_KEEP == pWindowLogicNode->node.groupAction ? false : true);
1167 1168 1169 1170 1171
  pWindow->node.inputTsOrder = pWindowLogicNode->node.inputTsOrder;
  pWindow->node.outputTsOrder = pWindowLogicNode->node.outputTsOrder;
  if (nodeType(pWindow) == QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL) {
    pWindow->node.inputTsOrder = pWindowLogicNode->node.outputTsOrder;
  }
1172

X
Xiaoyu Wang 已提交
1173 1174
  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pFuncs = NULL;
X
Xiaoyu Wang 已提交
1175
  int32_t    code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
X
Xiaoyu Wang 已提交
1176 1177 1178 1179 1180 1181

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
1182
      code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
1183 1184 1185
    }
  }

1186 1187 1188
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
  }
5
54liuyao 已提交
1189 1190 1191
  if (TSDB_CODE_SUCCESS == code && pWindowLogicNode->pTsEnd) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTsEnd, &pWindow->pTsEnd);
  }
1192

X
Xiaoyu Wang 已提交
1193 1194 1195
  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
1196
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1197 1198 1199
    }
  }

S
slzhou 已提交
1200 1201 1202 1203
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pWindowLogicNode, (SPhysiNode*)pWindow);
  }

X
Xiaoyu Wang 已提交
1204 1205 1206
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pFuncs);

X
Xiaoyu Wang 已提交
1207 1208 1209
  return code;
}

1210 1211
static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
  switch (windowAlgo) {
X
Xiaoyu Wang 已提交
1212 1213
    case INTERVAL_ALGO_HASH:
      return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
X
Xiaoyu Wang 已提交
1214
    case INTERVAL_ALGO_MERGE:
1215
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
X
Xiaoyu Wang 已提交
1216 1217 1218 1219 1220 1221
    case INTERVAL_ALGO_STREAM_FINAL:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
    case INTERVAL_ALGO_STREAM_SEMI:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
    case INTERVAL_ALGO_STREAM_SINGLE:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
1222 1223 1224 1225 1226 1227 1228 1229
    case SESSION_ALGO_STREAM_FINAL:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
    case SESSION_ALGO_STREAM_SEMI:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION;
    case SESSION_ALGO_STREAM_SINGLE:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
    case SESSION_ALGO_MERGE:
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
X
Xiaoyu Wang 已提交
1230 1231
    default:
      break;
1232
  }
X
Xiaoyu Wang 已提交
1233
  return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
1234 1235
}

X
Xiaoyu Wang 已提交
1236 1237 1238
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                       SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
1239
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
X
Xiaoyu Wang 已提交
1240 1241 1242
  if (NULL == pInterval) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1243 1244 1245 1246

  pInterval->interval = pWindowLogicNode->interval;
  pInterval->offset = pWindowLogicNode->offset;
  pInterval->sliding = pWindowLogicNode->sliding;
H
Haojun Liao 已提交
1247 1248 1249
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;

1250 1251 1252 1253 1254 1255 1256 1257
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode);
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pInterval;
  } else {
    nodesDestroyNode((SNode*)pInterval);
  }

  return code;
X
Xiaoyu Wang 已提交
1258 1259
}

X
Xiaoyu Wang 已提交
1260 1261 1262
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
1263
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
X
Xiaoyu Wang 已提交
1264 1265
  if (NULL == pSession) {
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1266 1267
  }

X
Xiaoyu Wang 已提交
1268
  pSession->gap = pWindowLogicNode->sessionGap;
X
Xiaoyu Wang 已提交
1269

1270 1271 1272 1273 1274 1275 1276 1277
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode);
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pSession;
  } else {
    nodesDestroyNode((SNode*)pSession);
  }

  return code;
X
Xiaoyu Wang 已提交
1278 1279
}

X
Xiaoyu Wang 已提交
1280 1281
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
1282 1283 1284
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
      pCxt, (SLogicNode*)pWindowLogicNode,
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE : QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE));
1285 1286 1287 1288 1289
  if (NULL == pState) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
X
Xiaoyu Wang 已提交
1290 1291
  SNode*     pStateKey = NULL;
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
    }
  }

1309 1310 1311 1312 1313 1314 1315
  if (TSDB_CODE_SUCCESS == code) {
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pState;
  } else {
1316
    nodesDestroyNode((SNode*)pState);
1317 1318
  }

1319 1320 1321 1322
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyNode(pStateKey);

  return code;
1323 1324
}

X
Xiaoyu Wang 已提交
1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
static int32_t createEventWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SEventWinodwPhysiNode* pEvent = (SEventWinodwPhysiNode*)makePhysiNode(
      pCxt, (SLogicNode*)pWindowLogicNode,
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT : QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT));
  if (NULL == pEvent) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pStartCond, &pEvent->pStartCond);
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pEndCond, &pEvent->pEndCond);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pEvent->window, pWindowLogicNode);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pEvent;
  } else {
    nodesDestroyNode((SNode*)pEvent);
  }

  return code;
}

X
Xiaoyu Wang 已提交
1352 1353
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
                                     SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1354 1355
  switch (pWindowLogicNode->winType) {
    case WINDOW_TYPE_INTERVAL:
X
Xiaoyu Wang 已提交
1356
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1357
    case WINDOW_TYPE_SESSION:
X
Xiaoyu Wang 已提交
1358
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1359
    case WINDOW_TYPE_STATE:
1360
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1361 1362
    case WINDOW_TYPE_EVENT:
      return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1363 1364 1365
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1366
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
1367 1368
}

X
Xiaoyu Wang 已提交
1369 1370
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
                                   SPhysiNode** pPhyNode) {
1371 1372 1373
  SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(
      pCxt, (SLogicNode*)pSortLogicNode,
      pSortLogicNode->groupSort ? QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT : QUERY_NODE_PHYSICAL_PLAN_SORT);
X
Xiaoyu Wang 已提交
1374 1375 1376 1377 1378 1379
  if (NULL == pSort) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pSortKeys = NULL;
X
Xiaoyu Wang 已提交
1380
  int32_t    code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
X
Xiaoyu Wang 已提交
1381 1382 1383 1384 1385 1386

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
1387
      code = pushdownDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
1388 1389 1390 1391 1392
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
X
Xiaoyu Wang 已提交
1393 1394 1395 1396
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortLogicNode->node.pTargets, &pSort->pTargets);
X
Xiaoyu Wang 已提交
1397
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1398
      code = addDataBlockSlots(pCxt, pSort->pTargets, pSort->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1399 1400 1401
    }
  }

1402 1403 1404 1405
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pSortLogicNode, (SPhysiNode*)pSort);
  }

X
Xiaoyu Wang 已提交
1406 1407 1408
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pSort;
  } else {
1409
    nodesDestroyNode((SNode*)pSort);
X
Xiaoyu Wang 已提交
1410 1411
  }

X
Xiaoyu Wang 已提交
1412 1413 1414
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pSortKeys);

X
Xiaoyu Wang 已提交
1415 1416 1417
  return code;
}

1418 1419 1420 1421
static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SPartitionLogicNode* pPartLogicNode, ENodeType type,
                                            SPhysiNode** pPhyNode) {
  SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, type);
1422 1423 1424 1425 1426 1427
  if (NULL == pPart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pPartitionKeys = NULL;
X
Xiaoyu Wang 已提交
1428
  int32_t    code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
1429 1430 1431 1432 1433 1434

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1435
      code = pushdownDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
1436 1437 1438 1439 1440
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
X
Xiaoyu Wang 已提交
1441 1442 1443 1444
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->node.pTargets, &pPart->pTargets);
1445
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1446
      code = addDataBlockSlots(pCxt, pPart->pTargets, pPart->node.pOutputDataBlockDesc);
1447 1448 1449
    }
  }

1450 1451 1452 1453
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pPartLogicNode, (SPhysiNode*)pPart);
  }

1454 1455 1456
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pPart;
  } else {
1457
    nodesDestroyNode((SNode*)pPart);
1458 1459
  }

X
Xiaoyu Wang 已提交
1460 1461 1462
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pPartitionKeys);

1463 1464 1465
  return code;
}

1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493
static int32_t createStreamPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                              SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
  SStreamPartitionPhysiNode* pPart = NULL;
  int32_t                    code = createPartitionPhysiNodeImpl(pCxt, pChildren, pPartLogicNode,
                                                                 QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, (SPhysiNode**)&pPart);
  SDataBlockDescNode*        pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pTags, &pPart->pTags);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pSubtable, &pPart->pSubtable);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pPart;
  } else {
    nodesDestroyNode((SNode*)pPart);
  }
  return code;
}

static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                        SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
  if (pCxt->pPlanCxt->streamQuery) {
    return createStreamPartitionPhysiNode(pCxt, pChildren, pPartLogicNode, pPhyNode);
  }
  return createPartitionPhysiNodeImpl(pCxt, pChildren, pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION, pPhyNode);
}

X
Xiaoyu Wang 已提交
1494 1495
static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SFillLogicNode* pFillNode,
                                   SPhysiNode** pPhyNode) {
5
54liuyao 已提交
1496 1497 1498
  SFillPhysiNode* pFill = (SFillPhysiNode*)makePhysiNode(
      pCxt, (SLogicNode*)pFillNode,
      pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL : QUERY_NODE_PHYSICAL_PLAN_FILL);
X
Xiaoyu Wang 已提交
1499 1500 1501 1502
  if (NULL == pFill) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1503 1504
  pFill->mode = pFillNode->mode;
  pFill->timeRange = pFillNode->timeRange;
1505
  pFill->node.inputTsOrder = pFillNode->node.inputTsOrder;
X
Xiaoyu Wang 已提交
1506

X
Xiaoyu Wang 已提交
1507
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
1508 1509 1510 1511 1512 1513 1514
  int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillExprs, &pFill->pFillExprs);
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pFill->pFillExprs, pFill->node.pOutputDataBlockDesc);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pNotFillExprs, &pFill->pNotFillExprs);
  }
X
Xiaoyu Wang 已提交
1515
  if (TSDB_CODE_SUCCESS == code) {
1516
    code = addDataBlockSlots(pCxt, pFill->pNotFillExprs, pFill->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1517 1518 1519
  }

  if (TSDB_CODE_SUCCESS == code) {
1520
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pWStartTs, &pFill->pWStartTs);
X
Xiaoyu Wang 已提交
1521
  }
1522 1523 1524
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlot(pCxt, &pFill->pWStartTs, pFill->node.pOutputDataBlockDesc);
  }
X
Xiaoyu Wang 已提交
1525 1526 1527 1528 1529 1530 1531 1532

  if (TSDB_CODE_SUCCESS == code && NULL != pFillNode->pValues) {
    pFill->pValues = nodesCloneNode(pFillNode->pValues);
    if (NULL == pFill->pValues) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

1533 1534 1535 1536
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFillNode, (SPhysiNode*)pFill);
  }

X
Xiaoyu Wang 已提交
1537 1538 1539
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pFill;
  } else {
1540
    nodesDestroyNode((SNode*)pFill);
X
Xiaoyu Wang 已提交
1541 1542 1543 1544 1545
  }

  return code;
}

X
Xiaoyu Wang 已提交
1546
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
1547
  SExchangePhysiNode* pExchange = (SExchangePhysiNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1548 1549 1550
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1551 1552
  pExchange->srcStartGroupId = pMerge->srcGroupId;
  pExchange->srcEndGroupId = pMerge->srcGroupId;
D
dapan1121 已提交
1553
  pExchange->singleChannel = true;
X
Xiaoyu Wang 已提交
1554
  pExchange->node.pParent = (SPhysiNode*)pMerge;
1555
  pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1556
  if (NULL == pExchange->node.pOutputDataBlockDesc) {
1557
    nodesDestroyNode((SNode*)pExchange);
X
Xiaoyu Wang 已提交
1558 1559
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1560 1561
  SNode* pSlot = NULL;
  FOREACH(pSlot, pExchange->node.pOutputDataBlockDesc->pSlots) { ((SSlotDescNode*)pSlot)->output = true; }
1562
  return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
1563 1564 1565
}

static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1566 1567
  SMergePhysiNode* pMerge =
      (SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
X
Xiaoyu Wang 已提交
1568 1569 1570 1571 1572 1573
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
  pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
1574
  pMerge->groupSort = pMergeLogicNode->groupSort;
D
dapan1121 已提交
1575
  pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
1576
  pMerge->node.inputTsOrder = pMergeLogicNode->node.outputTsOrder;
X
Xiaoyu Wang 已提交
1577

X
Xiaoyu Wang 已提交
1578
  int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1579

X
Xiaoyu Wang 已提交
1580 1581 1582 1583 1584 1585
  if (TSDB_CODE_SUCCESS == code) {
    for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
      code = createExchangePhysiNodeByMerge(pMerge);
      if (TSDB_CODE_SUCCESS != code) {
        break;
      }
X
Xiaoyu Wang 已提交
1586 1587 1588
    }
  }

1589
  if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
X
Xiaoyu Wang 已提交
1590 1591 1592 1593
    code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
                         &pMerge->pMergeKeys);
  }

X
Xiaoyu Wang 已提交
1594 1595 1596 1597
  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
                         &pMerge->pTargets);
  }
X
Xiaoyu Wang 已提交
1598 1599 1600
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
  }
X
Xiaoyu Wang 已提交
1601

X
Xiaoyu Wang 已提交
1602 1603 1604
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pMerge;
  } else {
1605
    nodesDestroyNode((SNode*)pMerge);
X
Xiaoyu Wang 已提交
1606 1607 1608 1609 1610
  }

  return code;
}

X
Xiaoyu Wang 已提交
1611 1612
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                                 SNodeList* pChildren, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1613
  switch (nodeType(pLogicNode)) {
X
Xiaoyu Wang 已提交
1614
    case QUERY_NODE_LOGIC_PLAN_SCAN:
X
Xiaoyu Wang 已提交
1615
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1616
    case QUERY_NODE_LOGIC_PLAN_JOIN:
X
Xiaoyu Wang 已提交
1617
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1618
    case QUERY_NODE_LOGIC_PLAN_AGG:
X
Xiaoyu Wang 已提交
1619
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1620
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
X
Xiaoyu Wang 已提交
1621
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1622
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
X
Xiaoyu Wang 已提交
1623
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1624
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
X
Xiaoyu Wang 已提交
1625
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1626 1627
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
1628 1629
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1630 1631
    case QUERY_NODE_LOGIC_PLAN_FILL:
      return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
1632 1633
    case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
      return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1634 1635
    case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
      return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1636 1637
    case QUERY_NODE_LOGIC_PLAN_MERGE:
      return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1638 1639 1640
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1641 1642 1643 1644

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
1645 1646
static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                               SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658
  SNodeList* pChildren = nodesMakeList();
  if (NULL == pChildren) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pLogicChild;
  FOREACH(pLogicChild, pLogicNode->pChildren) {
    SPhysiNode* pChild = NULL;
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
    if (TSDB_CODE_SUCCESS == code) {
1659
      code = nodesListStrictAppend(pChildren, (SNode*)pChild);
X
Xiaoyu Wang 已提交
1660
    }
D
dapan1121 已提交
1661 1662 1663
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
1664 1665 1666 1667
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
X
Xiaoyu Wang 已提交
1668
  }
X
Xiaoyu Wang 已提交
1669

X
Xiaoyu Wang 已提交
1670
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1671 1672 1673 1674 1675 1676 1677
    if (LIST_LENGTH(pChildren) > 0) {
      (*pPhyNode)->pChildren = pChildren;
      SNode* pChild;
      FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
    } else {
      nodesDestroyList(pChildren);
    }
X
Xiaoyu Wang 已提交
1678 1679
  } else {
    nodesDestroyList(pChildren);
X
Xiaoyu Wang 已提交
1680 1681
  }

X
Xiaoyu Wang 已提交
1682
  return code;
X
Xiaoyu Wang 已提交
1683 1684
}

X
Xiaoyu Wang 已提交
1685
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
1686
  SDataInserterNode* pInserter = (SDataInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
X
Xiaoyu Wang 已提交
1687 1688 1689 1690
  if (NULL == pInserter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1691 1692
  pInserter->numOfTables = pBlocks->numOfTables;
  pInserter->size = pBlocks->size;
wafwerar's avatar
wafwerar 已提交
1693
  TSWAP(pInserter->pData, pBlocks->pData);
X
Xiaoyu Wang 已提交
1694 1695 1696

  *pSink = (SDataSinkNode*)pInserter;
  return TSDB_CODE_SUCCESS;
1697 1698
}

X
Xiaoyu Wang 已提交
1699
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
1700
  SDataDispatcherNode* pDispatcher = (SDataDispatcherNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH);
X
Xiaoyu Wang 已提交
1701 1702 1703 1704
  if (NULL == pDispatcher) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1705
  pDispatcher->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1706
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
1707
    nodesDestroyNode((SNode*)pDispatcher);
X
Xiaoyu Wang 已提交
1708 1709 1710 1711 1712
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pSink = (SDataSinkNode*)pDispatcher;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1713 1714
}

X
Xiaoyu Wang 已提交
1715
static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
1716
  SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
X
Xiaoyu Wang 已提交
1717 1718 1719
  if (NULL == pSubplan) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1720
  pSubplan->id = pLogicSubplan->id;
X
Xiaoyu Wang 已提交
1721 1722
  pSubplan->subplanType = pLogicSubplan->subplanType;
  pSubplan->level = pLogicSubplan->level;
X
Xiaoyu Wang 已提交
1723
  if (NULL != pCxt->pPlanCxt->pUser) {
X
Xiaoyu Wang 已提交
1724
    snprintf(pSubplan->user, sizeof(pSubplan->user), "%s", pCxt->pPlanCxt->pUser);
X
Xiaoyu Wang 已提交
1725
  }
X
Xiaoyu Wang 已提交
1726 1727 1728
  return pSubplan;
}

1729
static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
X
Xiaoyu Wang 已提交
1730
  pSubplan->msgType = pModify->msgType;
D
dapan1121 已提交
1731
  pSubplan->execNode.nodeId = pModify->pVgDataBlocks->vg.vgId;
X
Xiaoyu Wang 已提交
1732 1733 1734 1735
  pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
  return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
}

1736
static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan,
1737 1738 1739 1740 1741 1742 1743
                                   SDataSinkNode** pSink) {
  SQueryInserterNode* pInserter = (SQueryInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT);
  if (NULL == pInserter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pInserter->tableId = pModify->tableId;
1744
  pInserter->stableId = pModify->stableId;
1745
  pInserter->tableType = pModify->tableType;
1746
  strcpy(pInserter->tableName, pModify->tableName);
1747 1748
  pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
  pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
X
Xiaoyu Wang 已提交
1749
  pInserter->explain = (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pPlanCxt->pAstRoot) ? true : false);
1750
  vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
1751

1752 1753 1754 1755 1756 1757 1758 1759
  int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols,
                               &pInserter->pCols);
  if (TSDB_CODE_SUCCESS == code) {
    pInserter->sink.pInputDataBlockDesc =
        (SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc);
    if (NULL == pInserter->sink.pInputDataBlockDesc) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
1760 1761 1762 1763 1764 1765 1766 1767
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pSink = (SDataSinkNode*)pInserter;
  } else {
    nodesDestroyNode((SNode*)pInserter);
  }

1768
  return code;
1769 1770 1771 1772 1773 1774
}

static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
  int32_t code =
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
  if (TSDB_CODE_SUCCESS == code) {
1775
    code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
1776
  }
D
dapan1121 已提交
1777
  pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
1778 1779 1780 1781 1782 1783 1784 1785 1786 1787
  return code;
}

static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
  if (NULL == pModify->node.pChildren) {
    return buildInsertValuesSubplan(pCxt, pModify, pSubplan);
  }
  return buildInsertSelectSubplan(pCxt, pModify, pSubplan);
}

X
Xiaoyu Wang 已提交
1788 1789
static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
                                 SDataSinkNode** pSink) {
1790
  SDataDeleterNode* pDeleter = (SDataDeleterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE);
X
Xiaoyu Wang 已提交
1791 1792 1793 1794 1795 1796
  if (NULL == pDeleter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pDeleter->tableId = pModify->tableId;
  pDeleter->tableType = pModify->tableType;
1797
  strcpy(pDeleter->tableFName, pModify->tableName);
1798
  strcpy(pDeleter->tsColName, pModify->tsColName);
X
Xiaoyu Wang 已提交
1799 1800
  pDeleter->deleteTimeRange = pModify->deleteTimeRange;

X
Xiaoyu Wang 已提交
1801 1802
  int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
                               &pDeleter->pAffectedRows);
1803 1804 1805 1806 1807 1808
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pStartTs, &pDeleter->pStartTs);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pEndTs, &pDeleter->pEndTs);
  }
X
Xiaoyu Wang 已提交
1809
  if (TSDB_CODE_SUCCESS == code) {
1810
    pDeleter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1811 1812 1813 1814 1815 1816 1817 1818
    if (NULL == pDeleter->sink.pInputDataBlockDesc) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pSink = (SDataSinkNode*)pDeleter;
  } else {
1819
    nodesDestroyNode((SNode*)pDeleter);
X
Xiaoyu Wang 已提交
1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
  int32_t code =
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
  if (TSDB_CODE_SUCCESS == code) {
    code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
  }
D
dapan1121 已提交
1831
  pSubplan->msgType = TDMT_VND_DELETE;
X
Xiaoyu Wang 已提交
1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851
  return code;
}

static int32_t buildVnodeModifySubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pSubplan) {
  int32_t                code = TSDB_CODE_SUCCESS;
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
  switch (pModify->modifyType) {
    case MODIFY_TABLE_TYPE_INSERT:
      code = buildInsertSubplan(pCxt, pModify, pSubplan);
      break;
    case MODIFY_TABLE_TYPE_DELETE:
      code = buildDeleteSubplan(pCxt, pModify, pSubplan);
      break;
    default:
      code = TSDB_CODE_FAILED;
      break;
  }
  return code;
}

X
Xiaoyu Wang 已提交
1852
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
X
Xiaoyu Wang 已提交
1853
  SSubplan* pSubplan = makeSubplan(pCxt, pLogicSubplan);
X
Xiaoyu Wang 已提交
1854 1855 1856 1857 1858 1859
  if (NULL == pSubplan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

1860
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
X
Xiaoyu Wang 已提交
1861
    code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
1862
  } else {
D
dapan1121 已提交
1863 1864 1865 1866 1867
    if (SUBPLAN_TYPE_SCAN == pSubplan->subplanType) {
      pSubplan->msgType = TDMT_SCH_QUERY;
    } else {
      pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
    }
X
Xiaoyu Wang 已提交
1868 1869 1870 1871
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
    }
1872
  }
X
Xiaoyu Wang 已提交
1873

X
Xiaoyu Wang 已提交
1874 1875 1876
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiSubplan = pSubplan;
  } else {
1877
    nodesDestroyNode((SNode*)pSubplan);
X
Xiaoyu Wang 已提交
1878
  }
X
Xiaoyu Wang 已提交
1879

X
Xiaoyu Wang 已提交
1880
  return code;
X
Xiaoyu Wang 已提交
1881 1882
}

X
Xiaoyu Wang 已提交
1883
static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
1884
  SQueryPlan* pPlan = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
X
Xiaoyu Wang 已提交
1885 1886 1887 1888 1889
  if (NULL == pPlan) {
    return NULL;
  }
  pPlan->pSubplans = nodesMakeList();
  if (NULL == pPlan->pSubplans) {
1890
    nodesDestroyNode((SNode*)pPlan);
X
Xiaoyu Wang 已提交
1891
    return NULL;
1892
  }
X
Xiaoyu Wang 已提交
1893 1894
  pPlan->queryId = pCxt->pPlanCxt->queryId;
  return pPlan;
1895 1896
}

1897 1898
static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNode* pSubplan, int32_t level, SNodeList* pSubplans) {
  SNodeListNode* pGroup = NULL;
1899
  if (level >= LIST_LENGTH(pSubplans)) {
1900
    pGroup = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
X
bugfix  
Xiaoyu Wang 已提交
1901 1902 1903
    if (NULL == pGroup) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1904
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, (SNode*)pGroup)) {
X
bugfix  
Xiaoyu Wang 已提交
1905 1906
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1907
  } else {
1908
    pGroup = (SNodeListNode*)nodesListGetNode(pSubplans, level);
1909 1910 1911
  }
  if (NULL == pGroup->pNodeList) {
    pGroup->pNodeList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
1912 1913 1914
    if (NULL == pGroup->pNodeList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1915
  }
X
Xiaoyu Wang 已提交
1916
  return nodesListAppend(pGroup->pNodeList, (SNode*)pSubplan);
1917 1918
}

X
Xiaoyu Wang 已提交
1919 1920
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent,
                              SQueryPlan* pQueryPlan) {
X
Xiaoyu Wang 已提交
1921
  SSubplan* pSubplan = NULL;
X
Xiaoyu Wang 已提交
1922
  int32_t   code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
X
Xiaoyu Wang 已提交
1923

X
Xiaoyu Wang 已提交
1924
  if (TSDB_CODE_SUCCESS == code) {
1925
    code = pushSubplan(pCxt, (SNode*)pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
X
Xiaoyu Wang 已提交
1926
    ++(pQueryPlan->numOfSubplans);
1927 1928
  }

1929
  if (TSDB_CODE_SUCCESS != code) {
1930
    nodesDestroyNode((SNode*)pSubplan);
1931 1932 1933
    return code;
  }

X
Xiaoyu Wang 已提交
1934
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
1935
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pSubplan);
X
Xiaoyu Wang 已提交
1936
    if (TSDB_CODE_SUCCESS == code) {
1937
      code = nodesListMakeAppend(&pSubplan->pParents, (SNode*)pParent);
X
Xiaoyu Wang 已提交
1938 1939
    }
  }
X
Xiaoyu Wang 已提交
1940

X
Xiaoyu Wang 已提交
1941 1942 1943 1944 1945 1946
  if (TSDB_CODE_SUCCESS == code) {
    SNode* pChild = NULL;
    FOREACH(pChild, pLogicSubplan->pChildren) {
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
      if (TSDB_CODE_SUCCESS != code) {
        break;
X
Xiaoyu Wang 已提交
1947 1948 1949
      }
    }
  }
X
Xiaoyu Wang 已提交
1950

X
Xiaoyu Wang 已提交
1951
  return code;
1952 1953
}

X
Xiaoyu Wang 已提交
1954
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
1955
  SQueryPlan* pPlan = (SQueryPlan*)makeQueryPhysiPlan(pCxt);
X
Xiaoyu Wang 已提交
1956 1957
  if (NULL == pPlan) {
    return TSDB_CODE_OUT_OF_MEMORY;
1958
  }
X
Xiaoyu Wang 已提交
1959

X
Xiaoyu Wang 已提交
1960
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1961

X
Xiaoyu Wang 已提交
1962 1963 1964 1965 1966 1967
  SNode* pSubplan = NULL;
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
1968 1969
  }

X
Xiaoyu Wang 已提交
1970 1971 1972
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiPlan = pPlan;
  } else {
1973
    nodesDestroyNode((SNode*)pPlan);
X
Xiaoyu Wang 已提交
1974 1975
  }

X
Xiaoyu Wang 已提交
1976
  return code;
X
Xiaoyu Wang 已提交
1977
}
X
Xiaoyu Wang 已提交
1978

X
Xiaoyu Wang 已提交
1979
static void destoryLocationHash(void* p) {
X
Xiaoyu Wang 已提交
1980
  SHashObj*   pHash = *(SHashObj**)p;
X
bugfix  
Xiaoyu Wang 已提交
1981 1982 1983 1984 1985
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
  while (NULL != pIndex) {
    taosArrayDestroy(pIndex->pSlotIdsInfo);
    pIndex = taosHashIterate(pHash, pIndex);
  }
X
Xiaoyu Wang 已提交
1986 1987 1988 1989 1990 1991 1992
  taosHashCleanup(pHash);
}

static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
}

1993 1994 1995 1996 1997
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
1998
    pPlan->explainInfo.ratio = pStmt->pOptions->ratio;
1999 2000 2001 2002 2003
  } else {
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
  }
}

2004 2005 2006 2007 2008 2009 2010 2011 2012 2013
static void setExecNodeList(SPhysiPlanContext* pCxt, SArray* pExecNodeList) {
  if (NULL == pExecNodeList) {
    return;
  }
  if (pCxt->hasSysScan || !pCxt->hasScan) {
    SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
    taosArrayPush(pExecNodeList, &node);
  }
}

X
Xiaoyu Wang 已提交
2014
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
X
Xiaoyu Wang 已提交
2015 2016 2017 2018
  SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
                           .errCode = TSDB_CODE_SUCCESS,
                           .nextDataBlockId = 0,
                           .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
2019 2020
                           .hasScan = false,
                           .hasSysScan = false};
X
Xiaoyu Wang 已提交
2021 2022 2023
  if (NULL == cxt.pLocationHelper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
2024

2025
  int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
2026 2027
  if (TSDB_CODE_SUCCESS == code) {
    setExplainInfo(pCxt, *pPlan);
2028
    setExecNodeList(&cxt, pExecNodeList);
2029 2030
  }

X
Xiaoyu Wang 已提交
2031 2032
  destoryPhysiPlanContext(&cxt);
  return code;
X
Xiaoyu Wang 已提交
2033
}