planPhysiCreater.c 56.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "planInt.h"
X
Xiaoyu Wang 已提交
17

X
Xiaoyu Wang 已提交
18
#include "catalog.h"
X
Xiaoyu Wang 已提交
19
#include "functionMgt.h"
20
#include "systable.h"
X
Xiaoyu Wang 已提交
21
#include "tglobal.h"
X
Xiaoyu Wang 已提交
22

X
bugfix  
Xiaoyu Wang 已提交
23 24
typedef struct SSlotIdInfo {
  int16_t slotId;
X
Xiaoyu Wang 已提交
25
  bool    set;
X
bugfix  
Xiaoyu Wang 已提交
26 27
} SSlotIdInfo;

X
Xiaoyu Wang 已提交
28 29
typedef struct SSlotIndex {
  int16_t dataBlockId;
X
Xiaoyu Wang 已提交
30
  SArray* pSlotIdsInfo;  // duplicate name slot
X
Xiaoyu Wang 已提交
31 32 33
} SSlotIndex;

typedef struct SPhysiPlanContext {
X
Xiaoyu Wang 已提交
34
  SPlanContext* pPlanCxt;
X
Xiaoyu Wang 已提交
35 36 37 38
  int32_t       errCode;
  int16_t       nextDataBlockId;
  SArray*       pLocationHelper;
  SArray*       pExecNodeList;
X
Xiaoyu Wang 已提交
39 40
} SPhysiPlanContext;

X
Xiaoyu Wang 已提交
41
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
X
Xiaoyu Wang 已提交
42 43
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
    SColumnNode* pCol = (SColumnNode*)pNode;
X
Xiaoyu Wang 已提交
44
    if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
45 46
      return sprintf(pKey, "%s.%s", pStmtName, pCol->node.aliasName);
    }
X
Xiaoyu Wang 已提交
47 48 49 50 51
    if ('\0' == pCol->tableAlias[0]) {
      return sprintf(pKey, "%s", pCol->colName);
    }
    return sprintf(pKey, "%s.%s", pCol->tableAlias, pCol->colName);
  }
X
Xiaoyu Wang 已提交
52

X
Xiaoyu Wang 已提交
53
  if (NULL != pStmtName && '\0' != pStmtName[0]) {
X
Xiaoyu Wang 已提交
54 55
    return sprintf(pKey, "%s.%s", pStmtName, ((SExprNode*)pNode)->aliasName);
  }
X
Xiaoyu Wang 已提交
56 57 58
  return sprintf(pKey, "%s", ((SExprNode*)pNode)->aliasName);
}

X
bugfix  
Xiaoyu Wang 已提交
59
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_t slotId, bool output) {
X
Xiaoyu Wang 已提交
60
  SSlotDescNode* pSlot = (SSlotDescNode*)nodesMakeNode(QUERY_NODE_SLOT_DESC);
X
bugfix  
Xiaoyu Wang 已提交
61 62 63
  if (NULL == pSlot) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
64 65 66
  pSlot->slotId = slotId;
  pSlot->dataType = ((SExprNode*)pNode)->resType;
  pSlot->reserve = false;
X
bugfix  
Xiaoyu Wang 已提交
67
  pSlot->output = output;
X
Xiaoyu Wang 已提交
68 69 70
  return (SNode*)pSlot;
}

X
bugfix  
Xiaoyu Wang 已提交
71
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
X
Xiaoyu Wang 已提交
72 73
  STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET);
  if (NULL == pTarget) {
X
bugfix  
Xiaoyu Wang 已提交
74
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
75
  }
X
bugfix  
Xiaoyu Wang 已提交
76

X
Xiaoyu Wang 已提交
77 78 79
  pTarget->dataBlockId = dataBlockId;
  pTarget->slotId = slotId;
  pTarget->pExpr = pNode;
X
bugfix  
Xiaoyu Wang 已提交
80 81 82

  *pOutput = (SNode*)pTarget;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
83 84
}

X
bugfix  
Xiaoyu Wang 已提交
85
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
86 87
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
  if (NULL != pIndex) {
X
Xiaoyu Wang 已提交
88
    SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
89 90 91 92
    taosArrayPush(pIndex->pSlotIdsInfo, &info);
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
93
  SSlotIndex index = {.dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo))};
X
bugfix  
Xiaoyu Wang 已提交
94 95 96
  if (NULL == index.pSlotIdsInfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
97
  SSlotIdInfo info = {.slotId = slotId, .set = false};
X
bugfix  
Xiaoyu Wang 已提交
98
  taosArrayPush(index.pSlotIdsInfo, &info);
X
bugfix  
Xiaoyu Wang 已提交
99 100 101 102
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}

static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
X
Xiaoyu Wang 已提交
103
  char    name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
X
Xiaoyu Wang 已提交
104
  int32_t len = getSlotKey(pNode, NULL, name);
X
bugfix  
Xiaoyu Wang 已提交
105 106 107
  return putSlotToHashImpl(dataBlockId, slotId, name, len, pHash);
}

X
Xiaoyu Wang 已提交
108 109
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
                                       SHashObj** pDescHash) {
X
bugfix  
Xiaoyu Wang 已提交
110
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
bugfix  
Xiaoyu Wang 已提交
111 112 113 114 115 116 117 118 119 120 121 122
  if (NULL == pHash) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
    taosHashCleanup(pHash);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pDescHash = pHash;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
123 124
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
                                   SHashObj* pHash) {
X
bugfix  
Xiaoyu Wang 已提交
125
  pDataBlockDesc->pSlots = nodesMakeList();
X
Xiaoyu Wang 已提交
126
  if (NULL == pDataBlockDesc->pSlots) {
X
bugfix  
Xiaoyu Wang 已提交
127 128
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
129

X
bugfix  
Xiaoyu Wang 已提交
130 131
  int32_t code = TSDB_CODE_SUCCESS;
  int16_t slotId = 0;
X
Xiaoyu Wang 已提交
132
  SNode*  pNode = NULL;
X
bugfix  
Xiaoyu Wang 已提交
133 134 135 136 137 138
  FOREACH(pNode, pList) {
    code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId, true));
    if (TSDB_CODE_SUCCESS == code) {
      code = putSlotToHash(pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
    }
    if (TSDB_CODE_SUCCESS == code) {
139 140
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
X
bugfix  
Xiaoyu Wang 已提交
141 142 143
      ++slotId;
    } else {
      break;
X
Xiaoyu Wang 已提交
144
    }
X
bugfix  
Xiaoyu Wang 已提交
145 146 147 148 149 150 151 152 153 154 155 156
  }
  return code;
}

static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
  SDataBlockDescNode* pDesc = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC);
  if (NULL == pDesc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pDesc->dataBlockId = pCxt->nextDataBlockId++;

  SHashObj* pHash = NULL;
X
Xiaoyu Wang 已提交
157
  int32_t   code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash);
X
bugfix  
Xiaoyu Wang 已提交
158 159 160 161 162 163
  if (TSDB_CODE_SUCCESS == code) {
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash);
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pDataBlockDesc = pDesc;
X
Xiaoyu Wang 已提交
164
  } else {
X
bugfix  
Xiaoyu Wang 已提交
165
    nodesDestroyNode(pDesc);
X
Xiaoyu Wang 已提交
166
  }
X
bugfix  
Xiaoyu Wang 已提交
167 168 169 170

  return code;
}

X
bugfix  
Xiaoyu Wang 已提交
171 172 173 174 175 176 177 178 179 180 181 182
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
  for (int32_t i = 0; i < size; ++i) {
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
    if (!pInfo->set) {
      pInfo->set = true;
      return pInfo->slotId;
    }
  }
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
}

X
Xiaoyu Wang 已提交
183 184
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
                                     const char* pStmtName, bool output) {
185 186 187 188
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
189
  int32_t   code = TSDB_CODE_SUCCESS;
X
bugfix  
Xiaoyu Wang 已提交
190
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
X
Xiaoyu Wang 已提交
191 192
  int16_t   nextSlotId = taosHashGetSize(pHash), slotId = 0;
  SNode*    pNode = NULL;
X
Xiaoyu Wang 已提交
193
  FOREACH(pNode, pList) {
X
Xiaoyu Wang 已提交
194 195 196
    SNode*      pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
    char        name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    int32_t     len = getSlotKey(pExpr, pStmtName, name);
X
bugfix  
Xiaoyu Wang 已提交
197 198
    SSlotIndex* pIndex = taosHashGet(pHash, name, len);
    if (NULL == pIndex) {
199
      code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pExpr, nextSlotId, output));
X
bugfix  
Xiaoyu Wang 已提交
200 201 202
      if (TSDB_CODE_SUCCESS == code) {
        code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
      }
203 204 205 206
      pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
      if (output) {
        pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
      }
X
bugfix  
Xiaoyu Wang 已提交
207 208 209
      slotId = nextSlotId;
      ++nextSlotId;
    } else {
X
bugfix  
Xiaoyu Wang 已提交
210
      slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
X
bugfix  
Xiaoyu Wang 已提交
211
    }
212

X
bugfix  
Xiaoyu Wang 已提交
213 214 215 216 217 218 219
    if (TSDB_CODE_SUCCESS == code) {
      SNode* pTarget = NULL;
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pTarget);
      }
    }
X
Xiaoyu Wang 已提交
220

X
bugfix  
Xiaoyu Wang 已提交
221 222 223
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
224
  }
X
bugfix  
Xiaoyu Wang 已提交
225 226 227 228
  return code;
}

static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
229 230 231
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false);
}

232 233 234 235 236 237
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
  if (NULL == pNode || NULL == *pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
238
  int32_t    code = nodesListMakeAppend(&pList, *pNode);
239 240 241 242 243 244 245 246 247 248
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pNode = nodesListGetNode(pList, 0);
  }
  nodesClearList(pList);
  return code;
}

X
Xiaoyu Wang 已提交
249 250
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
                                           SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
251
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true);
X
bugfix  
Xiaoyu Wang 已提交
252 253 254
}

static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
X
Xiaoyu Wang 已提交
255
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true);
X
Xiaoyu Wang 已提交
256 257 258
}

typedef struct SSetSlotIdCxt {
X
Xiaoyu Wang 已提交
259
  int32_t   errCode;
X
Xiaoyu Wang 已提交
260 261 262 263
  SHashObj* pLeftHash;
  SHashObj* pRightHash;
} SSetSlotIdCxt;

264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
static void dumpSlots(const char* pName, SHashObj* pHash) {
  if (NULL == pHash) {
    return;
  }
  planDebug("%s", pName);
  void* pIt = taosHashIterate(pHash, NULL);
  while (NULL != pIt) {
    size_t len = 0;
    char*  pKey = taosHashGetKey(pIt, &len);
    char   name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
    strncpy(name, pKey, len);
    planDebug("\tslot name = %s", name);
    pIt = taosHashIterate(pHash, pIt);
  }
}

X
Xiaoyu Wang 已提交
280 281 282
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
X
Xiaoyu Wang 已提交
283 284 285
    char           name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN];
    int32_t        len = getSlotKey(pNode, NULL, name);
    SSlotIndex*    pIndex = taosHashGet(pCxt->pLeftHash, name, len);
X
Xiaoyu Wang 已提交
286 287 288 289
    if (NULL == pIndex) {
      pIndex = taosHashGet(pCxt->pRightHash, name, len);
    }
    // pIndex is definitely not NULL, otherwise it is a bug
X
bugfix  
Xiaoyu Wang 已提交
290
    if (NULL == pIndex) {
291
      planError("doSetSlotId failed, invalid slot name %s", name);
292 293
      dumpSlots("left datablock desc", pCxt->pLeftHash);
      dumpSlots("right datablock desc", pCxt->pRightHash);
294
      pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
X
bugfix  
Xiaoyu Wang 已提交
295 296
      return DEAL_RES_ERROR;
    }
X
Xiaoyu Wang 已提交
297
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
X
bugfix  
Xiaoyu Wang 已提交
298
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
X
Xiaoyu Wang 已提交
299 300 301 302 303
    return DEAL_RES_IGNORE_CHILD;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
304 305
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
                             SNode** pOutput) {
X
Xiaoyu Wang 已提交
306
  SNode* pRes = nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
307 308 309 310 311
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
312 313 314
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
315
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
316 317
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyNode(pRes);
X
Xiaoyu Wang 已提交
318
    return cxt.errCode;
X
Xiaoyu Wang 已提交
319
  }
X
Xiaoyu Wang 已提交
320 321 322

  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
323 324
}

X
Xiaoyu Wang 已提交
325 326
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
                             const SNodeList* pList, SNodeList** pOutput) {
X
Xiaoyu Wang 已提交
327
  SNodeList* pRes = nodesCloneList(pList);
X
Xiaoyu Wang 已提交
328 329 330 331 332
  if (NULL == pRes) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SSetSlotIdCxt cxt = {
X
Xiaoyu Wang 已提交
333 334 335
      .errCode = TSDB_CODE_SUCCESS,
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId))};
X
Xiaoyu Wang 已提交
336
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
X
Xiaoyu Wang 已提交
337 338
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
    nodesDestroyList(pRes);
X
Xiaoyu Wang 已提交
339
    return cxt.errCode;
X
Xiaoyu Wang 已提交
340
  }
X
Xiaoyu Wang 已提交
341 342
  *pOutput = pRes;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
343 344
}

X
Xiaoyu Wang 已提交
345
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
X
Xiaoyu Wang 已提交
346
  SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
X
Xiaoyu Wang 已提交
347 348 349
  if (NULL == pPhysiNode) {
    return NULL;
  }
X
bugfix  
Xiaoyu Wang 已提交
350 351 352

  int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
  if (TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
353
    nodesDestroyNode(pPhysiNode);
X
Xiaoyu Wang 已提交
354 355
    return NULL;
  }
X
Xiaoyu Wang 已提交
356
  pPhysiNode->pOutputDataBlockDesc->precision = pLogicNode->precision;
X
Xiaoyu Wang 已提交
357 358 359 360 361
  return pPhysiNode;
}

static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
  if (NULL != pLogicNode->pConditions) {
X
Xiaoyu Wang 已提交
362 363
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions,
                         &pPhysiNode->pConditions);
X
Xiaoyu Wang 已提交
364 365 366 367
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
}

static int32_t sortScanCols(SNodeList* pScanCols) {
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
  if (NULL == pArray) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNode* pCol = NULL;
X
Xiaoyu Wang 已提交
381
  FOREACH(pCol, pScanCols) { taosArrayPush(pArray, &pCol); }
X
Xiaoyu Wang 已提交
382 383 384
  taosArraySort(pArray, colIdCompare);

  int32_t index = 0;
X
Xiaoyu Wang 已提交
385
  FOREACH(pCol, pScanCols) { REPLACE_NODE(taosArrayGetP(pArray, index++)); }
X
Xiaoyu Wang 已提交
386 387 388 389 390
  taosArrayDestroy(pArray);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
391
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
392 393 394 395
  if (NULL == pScanCols) {
    return TSDB_CODE_SUCCESS;
  }

396 397 398
  pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
  if (NULL == pScanPhysiNode->pScanCols) {
    return TSDB_CODE_OUT_OF_MEMORY;
399
  }
X
bugfix  
Xiaoyu Wang 已提交
400
  return sortScanCols(pScanPhysiNode->pScanCols);
401 402
}

X
Xiaoyu Wang 已提交
403
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
X
Xiaoyu Wang 已提交
404
                                           SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
405 406 407
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
  if (TSDB_CODE_SUCCESS == code) {
    // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
X
bugfix  
Xiaoyu Wang 已提交
408
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
409
  }
410 411 412 413 414 415 416 417 418 419 420 421

  if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
    pScanPhysiNode->pScanPseudoCols = nodesCloneList(pScanLogicNode->pScanPseudoCols);
    if (NULL == pScanPhysiNode->pScanPseudoCols) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
  }

X
Xiaoyu Wang 已提交
422 423 424
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
  }
425

X
Xiaoyu Wang 已提交
426
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
427
    pScanPhysiNode->uid = pScanLogicNode->tableId;
X
Xiaoyu Wang 已提交
428
    pScanPhysiNode->suid = pScanLogicNode->stableId;
X
Xiaoyu Wang 已提交
429
    pScanPhysiNode->tableType = pScanLogicNode->tableType;
X
Xiaoyu Wang 已提交
430
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
X
Xiaoyu Wang 已提交
431 432 433 434 435 436
    if (NULL != pScanLogicNode->pTagCond) {
      pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond);
      if (NULL == pSubplan->pTagCond) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
X
Xiaoyu Wang 已提交
437
  }
X
Xiaoyu Wang 已提交
438

X
Xiaoyu Wang 已提交
439 440 441 442 443
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
  } else {
    nodesDestroyNode(pScanPhysiNode);
  }
X
Xiaoyu Wang 已提交
444

X
Xiaoyu Wang 已提交
445
  return code;
X
Xiaoyu Wang 已提交
446 447
}

448 449
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
450
  pNodeAddr->epSet = vg->epSet;
451 452
}

453 454
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                      SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
455 456
  STagScanPhysiNode* pTagScan =
      (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
X
Xiaoyu Wang 已提交
457 458 459
  if (NULL == pTagScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
460
  vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
D
dapan1121 已提交
461
  SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
462
  taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
X
Xiaoyu Wang 已提交
463
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
X
Xiaoyu Wang 已提交
464 465
}

X
Xiaoyu Wang 已提交
466 467 468
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                        SPhysiNode** pPhyNode) {
  STableScanPhysiNode* pTableScan =
X
Xiaoyu Wang 已提交
469
      (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
X
Xiaoyu Wang 已提交
470 471 472 473
  if (NULL == pTableScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
474
  memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
X
Xiaoyu Wang 已提交
475
  pTableScan->scanRange = pScanLogicNode->scanRange;
476
  pTableScan->ratio = pScanLogicNode->ratio;
5
54liuyao 已提交
477 478 479 480 481
  if (pScanLogicNode->pVgroupList) {
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
  }
  if (pCxt->pExecNodeList) {
D
dapan1121 已提交
482 483
    SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
    taosArrayPush(pCxt->pExecNodeList, &node);
5
54liuyao 已提交
484
  }
D
dapan1121 已提交
485
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
486 487 488 489 490 491
  pTableScan->dataRequired = pScanLogicNode->dataRequired;
  pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
  if (NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) {
    nodesDestroyNode(pTableScan);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
492 493 494 495 496
  pTableScan->interval = pScanLogicNode->interval;
  pTableScan->offset = pScanLogicNode->offset;
  pTableScan->sliding = pScanLogicNode->sliding;
  pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
  pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
5
54liuyao 已提交
497 498 499
  pTableScan->triggerType = pScanLogicNode->triggerType;
  pTableScan->watermark = pScanLogicNode->watermark;
  pTableScan->tsColId = pScanLogicNode->tsColId;
5
54liuyao 已提交
500
  pTableScan->filesFactor = pScanLogicNode->filesFactor;
X
Xiaoyu Wang 已提交
501

X
Xiaoyu Wang 已提交
502
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
X
Xiaoyu Wang 已提交
503 504
}

X
Xiaoyu Wang 已提交
505 506
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
                                              SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
507 508
  SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
                                                                               QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
X
Xiaoyu Wang 已提交
509 510 511 512
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
513 514
  pScan->showRewrite = pScanLogicNode->showRewrite;
  pScan->accountId = pCxt->pPlanCxt->acctId;
X
Xiaoyu Wang 已提交
515 516
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
517
    SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
X
Xiaoyu Wang 已提交
518 519
    taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
  } else {
520
    SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
D
dapan1121 已提交
521
    taosArrayPush(pCxt->pExecNodeList, &node);
X
Xiaoyu Wang 已提交
522
  }
X
Xiaoyu Wang 已提交
523
  pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
D
dapan1121 已提交
524
  tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
X
Xiaoyu Wang 已提交
525

X
Xiaoyu Wang 已提交
526
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
X
Xiaoyu Wang 已提交
527 528
}

X
Xiaoyu Wang 已提交
529 530
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                         SPhysiNode** pPhyNode) {
5
54liuyao 已提交
531 532 533 534
  int32_t res = createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
  if (res == TSDB_CODE_SUCCESS) {
    ENodeType type = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
    setNodeType(*pPhyNode, type);
X
Xiaoyu Wang 已提交
535
  }
5
54liuyao 已提交
536
  return res;
X
Xiaoyu Wang 已提交
537 538
}

X
Xiaoyu Wang 已提交
539 540
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
541 542
  switch (pScanLogicNode->scanType) {
    case SCAN_TYPE_TAG:
543
      return createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
544
    case SCAN_TYPE_TABLE:
X
Xiaoyu Wang 已提交
545
      return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
546
    case SCAN_TYPE_SYSTEM_TABLE:
X
Xiaoyu Wang 已提交
547
      return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
548
    case SCAN_TYPE_STREAM:
X
Xiaoyu Wang 已提交
549
      return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
550 551 552
    default:
      break;
  }
X
Xiaoyu Wang 已提交
553
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
554 555
}

X
Xiaoyu Wang 已提交
556 557
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
558
  SJoinPhysiNode* pJoin =
559
      (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
X
Xiaoyu Wang 已提交
560 561 562
  if (NULL == pJoin) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
563

564 565
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
X
Xiaoyu Wang 已提交
566
  int32_t             code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
567

568 569
  pJoin->joinType = pJoinLogicNode->joinType;
  if (NULL != pJoinLogicNode->pOnConditions) {
X
Xiaoyu Wang 已提交
570 571
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions,
                         &pJoin->pOnConditions);
572
  }
X
Xiaoyu Wang 已提交
573
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
574 575
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
                         &pJoin->pTargets);
X
Xiaoyu Wang 已提交
576 577
  }
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
578
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
579 580 581 582
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
  }
X
Xiaoyu Wang 已提交
583

X
Xiaoyu Wang 已提交
584 585 586 587 588
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pJoin;
  } else {
    nodesDestroyNode(pJoin);
  }
X
Xiaoyu Wang 已提交
589

X
Xiaoyu Wang 已提交
590
  return code;
X
Xiaoyu Wang 已提交
591 592 593
}

typedef struct SRewritePrecalcExprsCxt {
X
Xiaoyu Wang 已提交
594 595 596
  int32_t    errCode;
  int32_t    planNodeId;
  int32_t    rewriteId;
X
Xiaoyu Wang 已提交
597 598 599 600 601
  SNodeList* pPrecalcExprs;
} SRewritePrecalcExprsCxt;

static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SNode* pExpr = nodesCloneNode(*pNode);
X
bugfix  
Xiaoyu Wang 已提交
602
  if (NULL == pExpr) {
603
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
bugfix  
Xiaoyu Wang 已提交
604 605
    return DEAL_RES_ERROR;
  }
X
Xiaoyu Wang 已提交
606
  if (nodesListAppend(pCxt->pPrecalcExprs, pExpr)) {
607
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
608 609 610 611 612
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
  if (NULL == pCol) {
613
    pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
614 615 616 617 618 619 620 621
    nodesDestroyNode(pExpr);
    return DEAL_RES_ERROR;
  }
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
  pCol->node.resType = pRewrittenExpr->resType;
  if ('\0' != pRewrittenExpr->aliasName[0]) {
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  } else {
X
Xiaoyu Wang 已提交
622 623
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId,
             pCxt->rewriteId);
X
Xiaoyu Wang 已提交
624 625 626 627 628 629 630
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
  }
  nodesDestroyNode(*pNode);
  *pNode = (SNode*)pCol;
  return DEAL_RES_IGNORE_CHILD;
}

631 632 633 634 635
static int32_t rewriteValueToOperator(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
  SOperatorNode* pOper = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
  if (NULL == pOper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
636 637 638 639 640
  pOper->pLeft = nodesMakeNode(QUERY_NODE_LEFT_VALUE);
  if (NULL == pOper->pLeft) {
    nodesDestroyNode(pOper);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
641 642 643 644
  SValueNode* pVal = (SValueNode*)*pNode;
  pOper->node.resType = pVal->node.resType;
  strcpy(pOper->node.aliasName, pVal->node.aliasName);
  pOper->opType = OP_TYPE_ASSIGN;
645
  pOper->pRight = *pNode;
646 647 648 649
  *pNode = (SNode*)pOper;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
650 651 652
static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
  switch (nodeType(*pNode)) {
653
    case QUERY_NODE_VALUE: {
654 655 656
      if (((SValueNode*)*pNode)->notReserved) {
        break;
      }
657 658
      pCxt->errCode = rewriteValueToOperator(pCxt, pNode);
      if (TSDB_CODE_SUCCESS != pCxt->errCode) {
659 660 661 662
        return DEAL_RES_ERROR;
      }
      return collectAndRewrite(pCxt, pNode);
    }
X
Xiaoyu Wang 已提交
663 664
    case QUERY_NODE_OPERATOR:
    case QUERY_NODE_LOGIC_CONDITION: {
665
      return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
666 667
    }
    case QUERY_NODE_FUNCTION: {
668
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
669
        return collectAndRewrite(pCxt, pNode);
X
Xiaoyu Wang 已提交
670 671 672 673 674 675 676 677
      }
    }
    default:
      break;
  }
  return DEAL_RES_CONTINUE;
}

X
Xiaoyu Wang 已提交
678 679
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs,
                                   SNodeList** pRewrittenList) {
X
Xiaoyu Wang 已提交
680 681 682 683 684 685
  if (NULL == pList) {
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == *pPrecalcExprs) {
    *pPrecalcExprs = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
686 687 688
    if (NULL == *pPrecalcExprs) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
689 690 691
  }
  if (NULL == *pRewrittenList) {
    *pRewrittenList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
692 693 694
    if (NULL == *pRewrittenList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
695 696 697 698 699 700 701 702 703
  }
  SNode* pNode = NULL;
  FOREACH(pNode, pList) {
    SNode* pNew = NULL;
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
      pNew = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0));
    } else {
      pNew = nodesCloneNode(pNode);
    }
X
bugfix  
Xiaoyu Wang 已提交
704 705 706 707 708 709
    if (NULL == pNew) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListAppend(*pRewrittenList, pNew)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
710
  }
X
Xiaoyu Wang 已提交
711
  SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs};
X
Xiaoyu Wang 已提交
712
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
713 714
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs) || TSDB_CODE_SUCCESS != cxt.errCode) {
    DESTORY_LIST(*pPrecalcExprs);
X
Xiaoyu Wang 已提交
715 716 717 718
  }
  return cxt.errCode;
}

X
Xiaoyu Wang 已提交
719 720
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs,
                                  SNode** pRewritten) {
721 722 723 724 725
  if (NULL == pNode) {
    return TSDB_CODE_SUCCESS;
  }

  SNodeList* pList = NULL;
X
Xiaoyu Wang 已提交
726
  int32_t    code = nodesListMakeAppend(&pList, pNode);
727 728 729 730 731 732 733 734 735 736 737 738
  SNodeList* pRewrittenList = NULL;
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
  }
  nodesClearList(pList);
  nodesClearList(pRewrittenList);
  return code;
}

X
Xiaoyu Wang 已提交
739 740
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
                                  SPhysiNode** pPhyNode) {
741 742
  SAggPhysiNode* pAgg =
      (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
X
Xiaoyu Wang 已提交
743 744 745
  if (NULL == pAgg) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
746 747 748 749

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pGroupKeys = NULL;
  SNodeList* pAggFuncs = NULL;
X
Xiaoyu Wang 已提交
750
  int32_t    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
X
Xiaoyu Wang 已提交
751 752 753
  if (TSDB_CODE_SUCCESS == code) {
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
  }
X
Xiaoyu Wang 已提交
754

X
Xiaoyu Wang 已提交
755 756
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
X
Xiaoyu Wang 已提交
757 758 759
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
760
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
761
    }
X
Xiaoyu Wang 已提交
762 763
  }

X
Xiaoyu Wang 已提交
764 765 766
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
767
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
768
    }
X
Xiaoyu Wang 已提交
769 770
  }

X
Xiaoyu Wang 已提交
771 772 773
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
774
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
775
    }
X
Xiaoyu Wang 已提交
776 777
  }

X
Xiaoyu Wang 已提交
778 779 780
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
  }
X
Xiaoyu Wang 已提交
781

X
Xiaoyu Wang 已提交
782 783 784 785 786
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pAgg;
  } else {
    nodesDestroyNode(pAgg);
  }
X
Xiaoyu Wang 已提交
787

X
bugfix  
Xiaoyu Wang 已提交
788 789 790 791
  nodesDestroyList(pPrecalcExprs);
  nodesDestroyList(pGroupKeys);
  nodesDestroyList(pAggFuncs);

X
Xiaoyu Wang 已提交
792
  return code;
X
Xiaoyu Wang 已提交
793 794
}

795 796 797
static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SIndefRowsFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
  SIndefRowsFuncPhysiNode* pIdfRowsFunc = (SIndefRowsFuncPhysiNode*)makePhysiNode(
X
Xiaoyu Wang 已提交
798
      pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC);
799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
  if (NULL == pIdfRowsFunc) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pVectorFuncs = NULL;
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pVectorFuncs, &pPrecalcExprs, &pVectorFuncs);

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pIdfRowsFunc->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = pushdownDataBlockSlots(pCxt, pIdfRowsFunc->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pVectorFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pVectorFuncs, &pIdfRowsFunc->pVectorFuncs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pIdfRowsFunc->pVectorFuncs, pIdfRowsFunc->node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pIdfRowsFunc;
  } else {
    nodesDestroyNode(pIdfRowsFunc);
  }

  return code;
}

X
Xiaoyu Wang 已提交
832 833
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                      SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
834 835
  SProjectPhysiNode* pProject =
      (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
X
Xiaoyu Wang 已提交
836 837 838
  if (NULL == pProject) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
839

840 841 842 843 844
  pProject->limit = pProjectLogicNode->limit;
  pProject->offset = pProjectLogicNode->offset;
  pProject->slimit = pProjectLogicNode->slimit;
  pProject->soffset = pProjectLogicNode->soffset;

X
Xiaoyu Wang 已提交
845 846
  int32_t code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId,
                               -1, pProjectLogicNode->pProjections, &pProject->pProjections);
X
Xiaoyu Wang 已提交
847
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
848 849
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections,
                                       pProject->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
850 851 852 853
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
  }
X
Xiaoyu Wang 已提交
854

X
Xiaoyu Wang 已提交
855 856 857 858 859
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pProject;
  } else {
    nodesDestroyNode(pProject);
  }
X
Xiaoyu Wang 已提交
860

X
Xiaoyu Wang 已提交
861
  return code;
X
Xiaoyu Wang 已提交
862 863
}

X
Xiaoyu Wang 已提交
864 865
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                         SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
866 867
  SExchangePhysiNode* pExchange =
      (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
868 869 870
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
871

X
Xiaoyu Wang 已提交
872
  pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
X
bugfix  
Xiaoyu Wang 已提交
873
  *pPhyNode = (SPhysiNode*)pExchange;
X
Xiaoyu Wang 已提交
874

X
bugfix  
Xiaoyu Wang 已提交
875
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
876
}
X
Xiaoyu Wang 已提交
877

X
Xiaoyu Wang 已提交
878 879
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
880 881
  SScanPhysiNode* pScan =
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
X
Xiaoyu Wang 已提交
882 883 884
  if (NULL == pScan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
885

X
bugfix  
Xiaoyu Wang 已提交
886 887 888 889 890 891
  int32_t code = TSDB_CODE_SUCCESS;

  pScan->pScanCols = nodesCloneList(pExchangeLogicNode->node.pTargets);
  if (NULL == pScan->pScanCols) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
L
fix  
Liu Jicong 已提交
892 893 894 895 896

  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }

897 898 899
  if (TSDB_CODE_SUCCESS == code) {
    code = sortScanCols(pScan->pScanCols);
  }
X
Xiaoyu Wang 已提交
900
  if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
901
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
902 903 904 905 906 907 908 909 910 911 912
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pScan;
  } else {
    nodesDestroyNode(pScan);
  }

  return code;
}

X
Xiaoyu Wang 已提交
913 914
static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
                                       SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
915 916
  if (pCxt->pPlanCxt->streamQuery) {
    return createStreamScanPhysiNodeByExchange(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
917
  } else {
X
Xiaoyu Wang 已提交
918
    return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
919
  }
X
Xiaoyu Wang 已提交
920 921
}

X
Xiaoyu Wang 已提交
922 923
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWinodwPhysiNode* pWindow,
                                             SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
924 925
  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pFuncs = NULL;
X
Xiaoyu Wang 已提交
926
  int32_t    code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
X
Xiaoyu Wang 已提交
927 928 929 930 931 932

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
933
      code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe);
X
Xiaoyu Wang 已提交
934 935 936
    }
  }

937 938 939 940
  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
  }

X
Xiaoyu Wang 已提交
941 942 943
  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
    if (TSDB_CODE_SUCCESS == code) {
X
bugfix  
Xiaoyu Wang 已提交
944
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
945 946 947
    }
  }

X
Xiaoyu Wang 已提交
948 949
  pWindow->triggerType = pWindowLogicNode->triggerType;
  pWindow->watermark = pWindowLogicNode->watermark;
5
54liuyao 已提交
950
  pWindow->filesFactor = pWindowLogicNode->filesFactor;
X
Xiaoyu Wang 已提交
951

X
Xiaoyu Wang 已提交
952 953 954 955 956 957 958 959 960
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pWindow;
  } else {
    nodesDestroyNode(pWindow);
  }

  return code;
}

X
Xiaoyu Wang 已提交
961 962 963 964
static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
  switch (intervalAlgo) {
    case INTERVAL_ALGO_HASH:
      return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
X
Xiaoyu Wang 已提交
965 966
    case INTERVAL_ALGO_MERGE:
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
X
Xiaoyu Wang 已提交
967 968 969 970 971 972 973 974
    case INTERVAL_ALGO_STREAM_FINAL:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
    case INTERVAL_ALGO_STREAM_SEMI:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
    case INTERVAL_ALGO_STREAM_SINGLE:
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
    default:
      break;
975
  }
X
Xiaoyu Wang 已提交
976
  return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
977 978
}

X
Xiaoyu Wang 已提交
979 980 981
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                       SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
X
Xiaoyu Wang 已提交
982
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->intervalAlgo));
X
Xiaoyu Wang 已提交
983 984 985
  if (NULL == pInterval) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
986 987 988 989

  pInterval->interval = pWindowLogicNode->interval;
  pInterval->offset = pWindowLogicNode->offset;
  pInterval->sliding = pWindowLogicNode->sliding;
H
Haojun Liao 已提交
990 991 992
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;

X
Xiaoyu Wang 已提交
993 994 995
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
996 997 998
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
X
Xiaoyu Wang 已提交
999
      pCxt, (SLogicNode*)pWindowLogicNode,
1000
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION : QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION));
X
Xiaoyu Wang 已提交
1001 1002
  if (NULL == pSession) {
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1003 1004
  }

X
Xiaoyu Wang 已提交
1005
  pSession->gap = pWindowLogicNode->sessionGap;
X
Xiaoyu Wang 已提交
1006

X
Xiaoyu Wang 已提交
1007
  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1008 1009
}

X
Xiaoyu Wang 已提交
1010 1011
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
1012 1013 1014
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
      pCxt, (SLogicNode*)pWindowLogicNode,
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE : QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE));
1015 1016 1017 1018 1019
  if (NULL == pState) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
X
Xiaoyu Wang 已提交
1020 1021
  SNode*     pStateKey = NULL;
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
    }
  }

  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyNode(pState);
    return code;
  }

  return createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode, pPhyNode);
}

X
Xiaoyu Wang 已提交
1047 1048
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
                                     SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1049 1050
  switch (pWindowLogicNode->winType) {
    case WINDOW_TYPE_INTERVAL:
X
Xiaoyu Wang 已提交
1051
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1052
    case WINDOW_TYPE_SESSION:
X
Xiaoyu Wang 已提交
1053
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1054
    case WINDOW_TYPE_STATE:
1055
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1056 1057 1058
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1059
  return TSDB_CODE_FAILED;
X
Xiaoyu Wang 已提交
1060 1061
}

X
Xiaoyu Wang 已提交
1062 1063
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1064 1065
  SSortPhysiNode* pSort =
      (SSortPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pSortLogicNode, QUERY_NODE_PHYSICAL_PLAN_SORT);
X
Xiaoyu Wang 已提交
1066 1067 1068 1069 1070 1071
  if (NULL == pSort) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pSortKeys = NULL;
X
Xiaoyu Wang 已提交
1072
  int32_t    code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
X
Xiaoyu Wang 已提交
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
X
Xiaoyu Wang 已提交
1085 1086 1087 1088
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortLogicNode->node.pTargets, &pSort->pTargets);
X
Xiaoyu Wang 已提交
1089
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1090
      code = addDataBlockSlots(pCxt, pSort->pTargets, pSort->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pSort;
  } else {
    nodesDestroyNode(pSort);
  }

  return code;
}

X
Xiaoyu Wang 已提交
1103 1104
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
                                        SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1105 1106
  SPartitionPhysiNode* pPart =
      (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
1107 1108 1109 1110 1111 1112
  if (NULL == pPart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pPrecalcExprs = NULL;
  SNodeList* pPartitionKeys = NULL;
X
Xiaoyu Wang 已提交
1113
  int32_t    code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125

  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  // push down expression to pOutputDataBlockDesc of child node
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
    if (TSDB_CODE_SUCCESS == code) {
      code = addDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
X
Xiaoyu Wang 已提交
1126 1127 1128 1129
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->node.pTargets, &pPart->pTargets);
1130
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1131
      code = addDataBlockSlots(pCxt, pPart->pTargets, pPart->node.pOutputDataBlockDesc);
1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pPart;
  } else {
    nodesDestroyNode(pPart);
  }

  return code;
}

X
Xiaoyu Wang 已提交
1144 1145
static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SFillLogicNode* pFillNode,
                                   SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1146
  SFillPhysiNode* pFill = (SFillPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFillNode, QUERY_NODE_PHYSICAL_PLAN_FILL);
X
Xiaoyu Wang 已提交
1147 1148 1149 1150
  if (NULL == pFill) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1151 1152 1153
  pFill->mode = pFillNode->mode;
  pFill->timeRange = pFillNode->timeRange;

X
Xiaoyu Wang 已提交
1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
  int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->node.pTargets, &pFill->pTargets);
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pFill->pTargets, pFill->node.pOutputDataBlockDesc);
  }

  if (TSDB_CODE_SUCCESS == code) {
    pFill->pWStartTs = nodesCloneNode(pFillNode->pWStartTs);
    if (NULL == pFill->pWStartTs) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pFillNode->pValues) {
    pFill->pValues = nodesCloneNode(pFillNode->pValues);
    if (NULL == pFill->pValues) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pFill;
  } else {
    nodesDestroyNode(pFill);
  }

  return code;
}

X
Xiaoyu Wang 已提交
1183 1184 1185 1186 1187 1188
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
  SExchangePhysiNode* pExchange = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pMerge->srcGroupId;
D
dapan1121 已提交
1189
  pExchange->singleChannel = true;
X
Xiaoyu Wang 已提交
1190 1191 1192 1193 1194 1195
  pExchange->node.pParent = (SPhysiNode*)pMerge;
  pExchange->node.pOutputDataBlockDesc = nodesCloneNode(pMerge->node.pOutputDataBlockDesc);
  if (NULL == pExchange->node.pOutputDataBlockDesc) {
    nodesDestroyNode(pExchange);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1196 1197
  SNode* pSlot = NULL;
  FOREACH(pSlot, pExchange->node.pOutputDataBlockDesc->pSlots) { ((SSlotDescNode*)pSlot)->output = true; }
X
Xiaoyu Wang 已提交
1198 1199 1200 1201
  return nodesListMakeStrictAppend(&pMerge->node.pChildren, pExchange);
}

static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1202 1203
  SMergePhysiNode* pMerge =
      (SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
X
Xiaoyu Wang 已提交
1204 1205 1206 1207 1208 1209 1210
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
  pMerge->srcGroupId = pMergeLogicNode->srcGroupId;

X
Xiaoyu Wang 已提交
1211
  int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1212

X
Xiaoyu Wang 已提交
1213 1214 1215 1216 1217 1218
  if (TSDB_CODE_SUCCESS == code) {
    for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
      code = createExchangePhysiNodeByMerge(pMerge);
      if (TSDB_CODE_SUCCESS != code) {
        break;
      }
X
Xiaoyu Wang 已提交
1219 1220 1221 1222 1223 1224 1225 1226
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
                         &pMerge->pMergeKeys);
  }

X
Xiaoyu Wang 已提交
1227 1228 1229 1230
  if (TSDB_CODE_SUCCESS == code) {
    code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
                         &pMerge->pTargets);
  }
X
Xiaoyu Wang 已提交
1231 1232 1233
  if (TSDB_CODE_SUCCESS == code) {
    code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
  }
X
Xiaoyu Wang 已提交
1234

X
Xiaoyu Wang 已提交
1235 1236 1237 1238 1239 1240 1241 1242 1243
  if (TSDB_CODE_SUCCESS == code) {
    *pPhyNode = (SPhysiNode*)pMerge;
  } else {
    nodesDestroyNode(pMerge);
  }

  return code;
}

X
Xiaoyu Wang 已提交
1244 1245
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                                 SNodeList* pChildren, SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1246
  switch (nodeType(pLogicNode)) {
X
Xiaoyu Wang 已提交
1247
    case QUERY_NODE_LOGIC_PLAN_SCAN:
X
Xiaoyu Wang 已提交
1248
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1249
    case QUERY_NODE_LOGIC_PLAN_JOIN:
X
Xiaoyu Wang 已提交
1250
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1251
    case QUERY_NODE_LOGIC_PLAN_AGG:
X
Xiaoyu Wang 已提交
1252
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1253
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
X
Xiaoyu Wang 已提交
1254
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1255
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
X
Xiaoyu Wang 已提交
1256
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1257
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
X
Xiaoyu Wang 已提交
1258
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1259 1260
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
1261 1262
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1263 1264
    case QUERY_NODE_LOGIC_PLAN_FILL:
      return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
1265 1266
    case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
      return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1267 1268
    case QUERY_NODE_LOGIC_PLAN_MERGE:
      return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode);
X
Xiaoyu Wang 已提交
1269 1270 1271
    default:
      break;
  }
X
Xiaoyu Wang 已提交
1272 1273 1274 1275

  return TSDB_CODE_FAILED;
}

X
Xiaoyu Wang 已提交
1276 1277
static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
                               SPhysiNode** pPhyNode) {
X
Xiaoyu Wang 已提交
1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295
  SNodeList* pChildren = nodesMakeList();
  if (NULL == pChildren) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pLogicChild;
  FOREACH(pLogicChild, pLogicNode->pChildren) {
    SPhysiNode* pChild = NULL;
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
    if (TSDB_CODE_SUCCESS == code) {
      code = nodesListStrictAppend(pChildren, pChild);
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
X
Xiaoyu Wang 已提交
1296
  }
X
Xiaoyu Wang 已提交
1297

X
Xiaoyu Wang 已提交
1298
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1299 1300 1301 1302 1303 1304 1305
    if (LIST_LENGTH(pChildren) > 0) {
      (*pPhyNode)->pChildren = pChildren;
      SNode* pChild;
      FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
    } else {
      nodesDestroyList(pChildren);
    }
X
Xiaoyu Wang 已提交
1306 1307
  } else {
    nodesDestroyList(pChildren);
X
Xiaoyu Wang 已提交
1308 1309
  }

X
Xiaoyu Wang 已提交
1310
  return code;
X
Xiaoyu Wang 已提交
1311 1312
}

X
Xiaoyu Wang 已提交
1313
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
1314
  SDataInserterNode* pInserter = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
X
Xiaoyu Wang 已提交
1315 1316 1317 1318
  if (NULL == pInserter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

1319 1320
  pInserter->numOfTables = pBlocks->numOfTables;
  pInserter->size = pBlocks->size;
wafwerar's avatar
wafwerar 已提交
1321
  TSWAP(pInserter->pData, pBlocks->pData);
X
Xiaoyu Wang 已提交
1322 1323 1324

  *pSink = (SDataSinkNode*)pInserter;
  return TSDB_CODE_SUCCESS;
1325 1326
}

X
Xiaoyu Wang 已提交
1327
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
X
Xiaoyu Wang 已提交
1328
  SDataDispatcherNode* pDispatcher = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH);
X
Xiaoyu Wang 已提交
1329 1330 1331 1332
  if (NULL == pDispatcher) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1333
  pDispatcher->sink.pInputDataBlockDesc = nodesCloneNode(pRoot->pOutputDataBlockDesc);
X
Xiaoyu Wang 已提交
1334 1335 1336 1337 1338 1339 1340
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
    nodesDestroyNode(pDispatcher);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  *pSink = (SDataSinkNode*)pDispatcher;
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1341 1342
}

X
Xiaoyu Wang 已提交
1343
static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
X
Xiaoyu Wang 已提交
1344
  SSubplan* pSubplan = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
X
Xiaoyu Wang 已提交
1345 1346 1347
  if (NULL == pSubplan) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1348
  pSubplan->id = pLogicSubplan->id;
X
Xiaoyu Wang 已提交
1349 1350
  pSubplan->subplanType = pLogicSubplan->subplanType;
  pSubplan->level = pLogicSubplan->level;
X
Xiaoyu Wang 已提交
1351 1352 1353
  return pSubplan;
}

X
Xiaoyu Wang 已提交
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
  pSubplan->msgType = pModify->msgType;
  pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
  SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
  taosArrayPush(pCxt->pExecNodeList, &node);
  return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
}

static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
                                 SDataSinkNode** pSink) {
  SDataDeleterNode* pDeleter = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE);
  if (NULL == pDeleter) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pDeleter->tableId = pModify->tableId;
  pDeleter->tableType = pModify->tableType;
  strcpy(pDeleter->tableFName, pModify->tableFName);
  pDeleter->deleteTimeRange = pModify->deleteTimeRange;

X
Xiaoyu Wang 已提交
1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
  int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
                               &pDeleter->pAffectedRows);
  if (TSDB_CODE_SUCCESS == code) {
    pDeleter->sink.pInputDataBlockDesc = nodesCloneNode(pRoot->pOutputDataBlockDesc);
    if (NULL == pDeleter->sink.pInputDataBlockDesc) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pSink = (SDataSinkNode*)pDeleter;
  } else {
X
Xiaoyu Wang 已提交
1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
    nodesDestroyNode(pDeleter);
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
  int32_t code =
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
  if (TSDB_CODE_SUCCESS == code) {
    code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
  }
D
dapan1121 已提交
1398
  pSubplan->msgType = TDMT_VND_DELETE;
X
Xiaoyu Wang 已提交
1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
  return code;
}

static int32_t buildVnodeModifySubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pSubplan) {
  int32_t                code = TSDB_CODE_SUCCESS;
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
  switch (pModify->modifyType) {
    case MODIFY_TABLE_TYPE_INSERT:
      code = buildInsertSubplan(pCxt, pModify, pSubplan);
      break;
    case MODIFY_TABLE_TYPE_DELETE:
      code = buildDeleteSubplan(pCxt, pModify, pSubplan);
      break;
    default:
      code = TSDB_CODE_FAILED;
      break;
  }
  return code;
}

X
Xiaoyu Wang 已提交
1419
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
X
Xiaoyu Wang 已提交
1420
  SSubplan* pSubplan = makeSubplan(pCxt, pLogicSubplan);
X
Xiaoyu Wang 已提交
1421 1422 1423 1424 1425 1426
  if (NULL == pSubplan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;

1427
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
X
Xiaoyu Wang 已提交
1428
    code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
1429
  } else {
1430
    pSubplan->msgType = TDMT_VND_QUERY;
X
Xiaoyu Wang 已提交
1431 1432 1433 1434
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
    }
1435
  }
X
Xiaoyu Wang 已提交
1436

X
Xiaoyu Wang 已提交
1437 1438 1439 1440
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiSubplan = pSubplan;
  } else {
    nodesDestroyNode(pSubplan);
X
Xiaoyu Wang 已提交
1441
  }
X
Xiaoyu Wang 已提交
1442

X
Xiaoyu Wang 已提交
1443
  return code;
X
Xiaoyu Wang 已提交
1444 1445
}

X
Xiaoyu Wang 已提交
1446 1447 1448 1449 1450 1451 1452 1453 1454
static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
  SQueryPlan* pPlan = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
  if (NULL == pPlan) {
    return NULL;
  }
  pPlan->pSubplans = nodesMakeList();
  if (NULL == pPlan->pSubplans) {
    nodesDestroyNode(pPlan);
    return NULL;
1455
  }
X
Xiaoyu Wang 已提交
1456 1457
  pPlan->queryId = pCxt->pPlanCxt->queryId;
  return pPlan;
1458 1459 1460 1461 1462 1463
}

static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t level, SNodeList* pSubplans) {
  SNodeListNode* pGroup;
  if (level >= LIST_LENGTH(pSubplans)) {
    pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST);
X
bugfix  
Xiaoyu Wang 已提交
1464 1465 1466 1467 1468 1469
    if (NULL == pGroup) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, pGroup)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1470 1471 1472 1473 1474
  } else {
    pGroup = nodesListGetNode(pSubplans, level);
  }
  if (NULL == pGroup->pNodeList) {
    pGroup->pNodeList = nodesMakeList();
X
bugfix  
Xiaoyu Wang 已提交
1475 1476 1477
    if (NULL == pGroup->pNodeList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1478
  }
X
bugfix  
Xiaoyu Wang 已提交
1479
  return nodesListStrictAppend(pGroup->pNodeList, pSubplan);
1480 1481
}

X
Xiaoyu Wang 已提交
1482 1483
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent,
                              SQueryPlan* pQueryPlan) {
X
Xiaoyu Wang 已提交
1484
  SSubplan* pSubplan = NULL;
X
Xiaoyu Wang 已提交
1485
  int32_t   code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
X
Xiaoyu Wang 已提交
1486

X
Xiaoyu Wang 已提交
1487 1488 1489
  if (TSDB_CODE_SUCCESS == code) {
    code = pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
    ++(pQueryPlan->numOfSubplans);
1490 1491
  }

1492 1493 1494 1495 1496
  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyNode(pSubplan);
    return code;
  }

X
Xiaoyu Wang 已提交
1497 1498 1499 1500
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
    code = nodesListMakeAppend(&pParent->pChildren, pSubplan);
    if (TSDB_CODE_SUCCESS == code) {
      code = nodesListMakeAppend(&pSubplan->pParents, pParent);
X
Xiaoyu Wang 已提交
1501 1502
    }
  }
X
Xiaoyu Wang 已提交
1503

X
Xiaoyu Wang 已提交
1504 1505 1506 1507 1508 1509
  if (TSDB_CODE_SUCCESS == code) {
    SNode* pChild = NULL;
    FOREACH(pChild, pLogicSubplan->pChildren) {
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
      if (TSDB_CODE_SUCCESS != code) {
        break;
X
Xiaoyu Wang 已提交
1510 1511 1512
      }
    }
  }
X
Xiaoyu Wang 已提交
1513

X
Xiaoyu Wang 已提交
1514
  return code;
1515 1516
}

X
Xiaoyu Wang 已提交
1517 1518 1519 1520
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
  SQueryPlan* pPlan = makeQueryPhysiPlan(pCxt);
  if (NULL == pPlan) {
    return TSDB_CODE_OUT_OF_MEMORY;
1521
  }
X
Xiaoyu Wang 已提交
1522

X
Xiaoyu Wang 已提交
1523
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1524

X
Xiaoyu Wang 已提交
1525 1526 1527 1528 1529 1530
  SNode* pSubplan = NULL;
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
X
Xiaoyu Wang 已提交
1531 1532
  }

X
Xiaoyu Wang 已提交
1533 1534 1535 1536
  if (TSDB_CODE_SUCCESS == code) {
    *pPhysiPlan = pPlan;
  } else {
    nodesDestroyNode(pPlan);
X
Xiaoyu Wang 已提交
1537 1538
  }

X
Xiaoyu Wang 已提交
1539
  return code;
X
Xiaoyu Wang 已提交
1540
}
X
Xiaoyu Wang 已提交
1541

X
Xiaoyu Wang 已提交
1542
static void destoryLocationHash(void* p) {
X
Xiaoyu Wang 已提交
1543
  SHashObj*   pHash = *(SHashObj**)p;
X
bugfix  
Xiaoyu Wang 已提交
1544 1545 1546 1547 1548
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
  while (NULL != pIndex) {
    taosArrayDestroy(pIndex->pSlotIdsInfo);
    pIndex = taosHashIterate(pHash, pIndex);
  }
X
Xiaoyu Wang 已提交
1549 1550 1551 1552 1553 1554 1555
  taosHashCleanup(pHash);
}

static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
}

1556 1557 1558 1559 1560
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
1561
    pPlan->explainInfo.ratio = pStmt->pOptions->ratio;
1562 1563 1564 1565 1566
  } else {
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
  }
}

X
Xiaoyu Wang 已提交
1567
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
X
Xiaoyu Wang 已提交
1568 1569 1570 1571 1572
  SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
                           .errCode = TSDB_CODE_SUCCESS,
                           .nextDataBlockId = 0,
                           .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
                           .pExecNodeList = pExecNodeList};
X
Xiaoyu Wang 已提交
1573 1574 1575
  if (NULL == cxt.pLocationHelper) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1576

1577 1578
  if (QUERY_POLICY_VNODE == tsQueryPolicy) {
    taosArrayClear(pExecNodeList);
X
Xiaoyu Wang 已提交
1579
  }
1580 1581

  int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
1582 1583 1584 1585
  if (TSDB_CODE_SUCCESS == code) {
    setExplainInfo(pCxt, *pPlan);
  }

X
Xiaoyu Wang 已提交
1586 1587
  destoryPhysiPlanContext(&cxt);
  return code;
X
Xiaoyu Wang 已提交
1588
}