physicalPlan.c 10.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "plannerInt.h"
X
Xiaoyu Wang 已提交
17
#include "exception.h"
18
#include "parser.h"
19

20 21 22
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

23 24 25 26 27 28 29
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

30 31 32 33 34 35
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
36

X
Xiaoyu Wang 已提交
37 38 39 40 41 42 43
static void* vailidPointer(void* p) {
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

44 45 46 47
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

48 49 50 51 52 53 54 55
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
    if (strcmp(name, gOpName[i])) {
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
56

X
Xiaoyu Wang 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
static SDataSink* initDataSink(int32_t type, int32_t size) {
  SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size));
  sink->info.type = type;
  return sink;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher));
  return (SDataSink*)dispatcher;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter));
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
  SWAP(inserter->pData, pBlocks->pData, char*);
  return (SDataSink*)inserter;
}

76
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
X
Xiaoyu Wang 已提交
77
  dataBlockSchema->numOfCols = pPlanNode->numOfCols;
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols);
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
  memcpy(dataBlockSchema->pSchema, pPlanNode->pSchema, sizeof(SSlotSchema) * pPlanNode->numOfCols);
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
99 100
}

101
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
X
Xiaoyu Wang 已提交
102
  SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
103
  node->info.type = type;
104 105 106 107 108 109
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
    return NULL;
  }
  return node;
110 111
}

112 113 114 115 116 117 118 119 120 121 122
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
  SScanPhyNode* node = (SScanPhyNode*)initPhyNode(pPlanNode, type, size);
  node->uid = pTable->pMeta->pTableMeta->uid;
  node->tableType = pTable->pMeta->pTableMeta->tableType;
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

123
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
  return MASTER_SCAN;
}

static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pTable, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pTable);
  node->window = pTable->window;
  // todo tag cond
  return (SPhyNode*)node;
}

static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}

static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
  return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan);
162 163 164
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
X
Xiaoyu Wang 已提交
165
  SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan)));
166 167 168 169 170 171 172
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
  subplan->type = type;
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
    if (NULL == pCxt->pCurrentSubplan->pChildern) {
X
Xiaoyu Wang 已提交
173
      pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
174
    }
175
    taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
X
Xiaoyu Wang 已提交
176
    subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
177 178 179 180
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
X
Xiaoyu Wang 已提交
181
    currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
182 183 184
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
185
  }
186
  taosArrayPush(currentLevel, &subplan);
187 188 189 190
  pCxt->pCurrentSubplan = subplan;
  return subplan;
}

X
Xiaoyu Wang 已提交
191 192 193 194 195 196 197 198 199 200 201
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SEpSet* epSet) {
  epSet->inUse = 0; // todo
  epSet->numOfEps = vg->numOfEps;
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
    epSet->port[i] = vg->epAddr[i].port;
    strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
  }
  return;
}

static void vgroupMsgToEpSet(const SVgroupMsg* vg, SEpSet* epSet) {
202 203 204 205 206 207 208
  epSet->inUse = 0; // todo
  epSet->numOfEps = vg->numOfEps;
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
    epSet->port[i] = vg->epAddr[i].port;
    strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
  }
  return;
X
Xiaoyu Wang 已提交
209 210
}

211
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
212 213
  SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList;
  for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
214
    STORE_CURRENT_SUBPLAN(pCxt);
215
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
216
    vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet);
217
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
X
Xiaoyu Wang 已提交
218
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
219
    RECOVERY_CURRENT_SUBPLAN(pCxt);
220
  }
221
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
222 223
}

224 225 226 227 228
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
  return (SPhyNode*)node;
}
229

230 231 232
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
233 234
}

235
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
236
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
237 238
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
239
  }
240
  return createSingleTableScanNode(pPlanNode, pTable);
241 242 243
}

static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
244 245 246 247 248 249
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
    case QNODE_TABLESCAN:
250
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
251
      break;
X
Xiaoyu Wang 已提交
252 253 254
    case QNODE_INSERT:
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
255 256 257 258
    default:
      assert(false);
  }
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
259
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
260 261
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
262
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
263 264 265
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
266
  }
X
Xiaoyu Wang 已提交
267
  return node;
268 269
}

X
Xiaoyu Wang 已提交
270 271 272 273 274 275 276 277 278 279 280 281
static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SArray* vgs = (SArray*)pPlanNode->pExtInfo;
  size_t numOfVg = taosArrayGetSize(vgs);
  for (int32_t i = 0; i < numOfVg; ++i) {
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
    vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
    subplan->pNode = NULL;
    subplan->pDataSink = createDataInserter(pCxt, blocks);
  }
}

282
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
X
Xiaoyu Wang 已提交
283 284 285 286 287 288 289 290
  if (QNODE_INSERT == pRoot->info.type) {
    splitInsertSubplan(pCxt, pRoot);
  } else {
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
    ++(pCxt->nextId.templateId);
    subplan->pNode = createPhyNode(pCxt, pRoot);
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
  }
291 292 293 294
  // todo deal subquery
}

int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) {
X
Xiaoyu Wang 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
      .pDag = vailidPointer(calloc(1, sizeof(SQueryDag))),
      .pCurrentSubplan = NULL,
      .nextId = {0} // todo queryid
    };
    *pDag = context.pDag;
    context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
310
  return TSDB_CODE_SUCCESS;
311
}
X
Xiaoyu Wang 已提交
312

D
dapan 已提交
313
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) {
314
  //todo
X
Xiaoyu Wang 已提交
315
}