physicalPlan.c 13.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "plannerInt.h"
X
Xiaoyu Wang 已提交
17
#include "exception.h"
18
#include "parser.h"
19

20 21 22
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

23 24 25 26 27 28 29
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

30 31 32 33 34 35
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
36

37
static void* validPointer(void* p) {
X
Xiaoyu Wang 已提交
38 39 40 41 42 43
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

44 45 46 47
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

48 49
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
50
    if (0 == strcmp(name, gOpName[i])) {
51 52 53 54 55
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
56

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
const char* dsinkTypeToDsinkName(int32_t type) {
  switch (type) {
    case DSINK_Dispatch:
      return "Dispatch";
    case DSINK_Insert:
      return "Insert";
    default:
      break;
  }
  return "Unknown";
}

int32_t dsinkNameToDsinkType(const char* name) {
  if (0 == strcmp(name, "Dispatch")) {
    return DSINK_Dispatch;
  } else if (0 == strcmp(name, "Insert")) {
    return DSINK_Insert;
  }
  return DSINK_Unknown;
}

78 79 80 81 82 83 84 85 86 87
static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
  dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols);
  if (NULL == dst->pSchema) {
    return false;
  }
  memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols);
  dst->numOfCols = src->numOfCols;
  dst->resultRowSize = src->resultRowSize;
  dst->precision = src->precision;
  return true;
X
Xiaoyu Wang 已提交
88 89
}

90
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
91
  dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
92 93 94 95
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols);
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
96

97
  dataBlockSchema->resultRowSize = 0;
98 99 100 101
  for (int32_t i = 0; i < pPlanNode->numOfExpr; ++i) {
    SExprInfo* pExprInfo = taosArrayGetP(pPlanNode->pExpr, i);
    memcpy(&dataBlockSchema->pSchema[i], &pExprInfo->base.resSchema, sizeof(SSlotSchema));

102 103
    dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
  }
104

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
121 122
}

123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) {
  SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
  sink->info.type = type;
  sink->info.name = dsinkTypeToDsinkName(type);
  if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) {
    tfree(sink);
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return sink;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot);
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
  SWAP(inserter->pData, pBlocks->pData, char*);
  return (SDataSink*)inserter;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot);
  return (SDataSink*)dispatcher;
}

147
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
148
  SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
149
  node->info.type = type;
150 151 152
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
153
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
154 155
  }
  return node;
156 157
}

158 159 160 161 162 163 164 165 166 167 168
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
  SScanPhyNode* node = (SScanPhyNode*)initPhyNode(pPlanNode, type, size);
  node->uid = pTable->pMeta->pTableMeta->uid;
  node->tableType = pTable->pMeta->pTableMeta->tableType;
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

169
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
170 171 172 173 174 175
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
H
Haojun Liao 已提交
176
  return MAIN_SCAN;
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
}

static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pTable, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pTable);
  node->window = pTable->window;
  // todo tag cond
  return (SPhyNode*)node;
}


static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
  return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan);
205 206 207
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
208
  SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
209 210
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
H
Haojun Liao 已提交
211 212

  subplan->type  = type;
213 214 215
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
H
Haojun Liao 已提交
216 217
    if (NULL == pCxt->pCurrentSubplan->pChildren) {
      pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
218
    }
H
Haojun Liao 已提交
219 220

    taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
221
    subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
222 223
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
H
Haojun Liao 已提交
224

225 226
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
227
    currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
228 229 230
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
231
  }
H
Haojun Liao 已提交
232

233
  taosArrayPush(currentLevel, &subplan);
234
  pCxt->pCurrentSubplan = subplan;
X
Xiaoyu Wang 已提交
235
  ++(pCxt->pDag->numOfSubplans);
236 237 238
  return subplan;
}

X
Xiaoyu Wang 已提交
239 240
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
241
  execNode->inUse = vg->inUse;
X
Xiaoyu Wang 已提交
242
  execNode->numOfEps = vg->numOfEps;
X
Xiaoyu Wang 已提交
243
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
244
    execNode->epAddr[i] = vg->epAddr[i];
X
Xiaoyu Wang 已提交
245 246 247 248
  }
  return;
}

X
Xiaoyu Wang 已提交
249 250 251 252
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
  execNode->inUse = 0; // todo
  execNode->numOfEps = vg->numOfEps;
253
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
254
    execNode->epAddr[i] = vg->epAddr[i];
255 256
  }
  return;
X
Xiaoyu Wang 已提交
257 258
}

259
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
260 261
  SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList;
  for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
262
    STORE_CURRENT_SUBPLAN(pCxt);
263
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
D
dapan 已提交
264
    subplan->msgType   = TDMT_VND_QUERY;
X
Xiaoyu Wang 已提交
265
    vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
266
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
267
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
268
    RECOVERY_CURRENT_SUBPLAN(pCxt);
269
  }
270
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
271 272
}

273 274 275
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
276
  node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr)));
277 278
  return (SPhyNode*)node;
}
279

280 281 282
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
283 284
}

D
dapan 已提交
285 286 287 288 289 290
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
  vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);

  return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}

291
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
292
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
D
dapan 已提交
293
    
294 295
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
296
  }
D
dapan 已提交
297
  return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
298 299 300
}

static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
301 302 303 304 305 306
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
    case QNODE_TABLESCAN:
307
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
308
      break;
H
Haojun Liao 已提交
309 310
    case QNODE_PROJECT:
//      node = create
311
    case QNODE_MODIFY:
X
Xiaoyu Wang 已提交
312 313
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
314 315 316
    default:
      assert(false);
  }
H
Haojun Liao 已提交
317

X
Xiaoyu Wang 已提交
318
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
319
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
320 321
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
322
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
323 324 325
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
326
  }
H
Haojun Liao 已提交
327

X
Xiaoyu Wang 已提交
328
  return node;
329 330
}

H
Haojun Liao 已提交
331 332 333 334 335
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;

  size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
  for (int32_t i = 0; i < numOfVgroups; ++i) {
X
Xiaoyu Wang 已提交
336
    STORE_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
337
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
H
Haojun Liao 已提交
338 339
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);

X
Xiaoyu Wang 已提交
340
    vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
341
    subplan->pDataSink  = createDataInserter(pCxt, blocks, NULL);
342 343 344
    subplan->pNode      = NULL;
    subplan->type       = QUERY_TYPE_MODIFY;
    subplan->msgType    = pPayload->msgType;
345
    subplan->id.queryId = pCxt->pDag->queryId;
H
Haojun Liao 已提交
346

X
Xiaoyu Wang 已提交
347
    RECOVERY_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
348 349 350
  }
}

351
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
352
  if (QNODE_MODIFY == pRoot->info.type) {
H
Haojun Liao 已提交
353
    splitModificationOpSubPlan(pCxt, pRoot);
X
Xiaoyu Wang 已提交
354
  } else {
D
dapan 已提交
355
    SSubplan* subplan  = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
356
    ++(pCxt->nextId.templateId);
H
Haojun Liao 已提交
357 358 359

    subplan->msgType   = TDMT_VND_QUERY;
    subplan->pNode     = createPhyNode(pCxt, pRoot);
360
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);    
X
Xiaoyu Wang 已提交
361
  }
362 363 364
  // todo deal subquery
}

365
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId) {
X
Xiaoyu Wang 已提交
366 367 368
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
369
      .pDag = validPointer(calloc(1, sizeof(SQueryDag))),
X
Xiaoyu Wang 已提交
370
      .pCurrentSubplan = NULL,
H
Haojun Liao 已提交
371
      .nextId = {.queryId = requestId},
X
Xiaoyu Wang 已提交
372
    };
H
Haojun Liao 已提交
373

X
Xiaoyu Wang 已提交
374
    *pDag = context.pDag;
375 376
    context.pDag->queryId = requestId;

377
    context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
X
Xiaoyu Wang 已提交
378 379 380 381 382 383
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
384
  return TSDB_CODE_SUCCESS;
385
}
X
Xiaoyu Wang 已提交
386

387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
void setExchangSourceNode(uint64_t templateId, SQueryNodeAddr* pEp, SPhyNode* pNode) {
  if (NULL == pNode) {
    return;
  }
  if (OP_Exchange == pNode->info.type) {
    SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
    if (templateId == pExchange->srcTemplateId) {
      taosArrayPush(pExchange->pSrcEndPoints, pEp);
    }
  }
  if (pNode->pChildren != NULL) {
    size_t size = taosArrayGetSize(pNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
      setExchangSourceNode(templateId, pEp, taosArrayGetP(pNode->pChildren, i));
    }
  }
}

void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* pEp) {
  setExchangSourceNode(templateId, pEp, subplan->pNode);
X
Xiaoyu Wang 已提交
407
}