physicalPlan.c 14.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "plannerInt.h"
X
Xiaoyu Wang 已提交
17
#include "exception.h"
18
#include "parser.h"
19

20 21 22
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

23 24 25 26 27 28 29
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

30 31 32 33 34 35
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
36

37
static void* validPointer(void* p) {
X
Xiaoyu Wang 已提交
38 39 40 41 42 43
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

44 45 46 47
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

48 49
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
50
    if (0 == strcmp(name, gOpName[i])) {
51 52 53 54 55
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
56

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
const char* dsinkTypeToDsinkName(int32_t type) {
  switch (type) {
    case DSINK_Dispatch:
      return "Dispatch";
    case DSINK_Insert:
      return "Insert";
    default:
      break;
  }
  return "Unknown";
}

int32_t dsinkNameToDsinkType(const char* name) {
  if (0 == strcmp(name, "Dispatch")) {
    return DSINK_Dispatch;
  } else if (0 == strcmp(name, "Insert")) {
    return DSINK_Insert;
  }
  return DSINK_Unknown;
}

78 79 80 81 82 83 84 85 86 87
static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
  dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols);
  if (NULL == dst->pSchema) {
    return false;
  }
  memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols);
  dst->numOfCols = src->numOfCols;
  dst->resultRowSize = src->resultRowSize;
  dst->precision = src->precision;
  return true;
X
Xiaoyu Wang 已提交
88 89
}

90
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
91
  dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
92
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr);
93 94 95
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
96

97
  dataBlockSchema->resultRowSize = 0;
98 99 100 101
  for (int32_t i = 0; i < pPlanNode->numOfExpr; ++i) {
    SExprInfo* pExprInfo = taosArrayGetP(pPlanNode->pExpr, i);
    memcpy(&dataBlockSchema->pSchema[i], &pExprInfo->base.resSchema, sizeof(SSlotSchema));

102 103
    dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
  }
104

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
121 122
}

123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) {
  SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
  sink->info.type = type;
  sink->info.name = dsinkTypeToDsinkName(type);
  if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) {
    tfree(sink);
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return sink;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot);
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
  SWAP(inserter->pData, pBlocks->pData, char*);
  return (SDataSink*)inserter;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot);
  return (SDataSink*)dispatcher;
}

147
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
148
  SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
149
  node->info.type = type;
150 151 152
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
153
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
154 155
  }
  return node;
156 157
}

158
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
H
Haojun Liao 已提交
159 160 161 162 163 164 165 166
  SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size);

  STableMeta *pTableMeta = pTable->pMeta->pTableMeta;
  node->uid       = pTableMeta->uid;
  node->count     = 1;
  node->order     = TSDB_ORDER_ASC;
  node->tableType = pTableMeta->tableType;

167 168 169 170 171 172 173
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

174
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
175 176 177 178 179 180
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
H
Haojun Liao 已提交
181
  return MAIN_SCAN;
182 183
}

H
Haojun Liao 已提交
184 185 186 187
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pQueryTableInfo, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pQueryTableInfo, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pQueryTableInfo);
  node->window = pQueryTableInfo->window;
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
  // todo tag cond
  return (SPhyNode*)node;
}


static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
  return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan);
210 211 212
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
213
  SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
214 215
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
H
Haojun Liao 已提交
216 217

  subplan->type  = type;
218 219 220
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
H
Haojun Liao 已提交
221 222
    if (NULL == pCxt->pCurrentSubplan->pChildren) {
      pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
223
    }
H
Haojun Liao 已提交
224 225

    taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
226
    subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
227 228
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
H
Haojun Liao 已提交
229

230 231
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
232
    currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
233 234 235
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
236
  }
H
Haojun Liao 已提交
237

238
  taosArrayPush(currentLevel, &subplan);
239
  pCxt->pCurrentSubplan = subplan;
X
Xiaoyu Wang 已提交
240
  ++(pCxt->pDag->numOfSubplans);
241 242 243
  return subplan;
}

X
Xiaoyu Wang 已提交
244 245
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
246
  execNode->inUse = vg->inUse;
X
Xiaoyu Wang 已提交
247
  execNode->numOfEps = vg->numOfEps;
X
Xiaoyu Wang 已提交
248
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
249
    execNode->epAddr[i] = vg->epAddr[i];
X
Xiaoyu Wang 已提交
250 251 252 253
  }
  return;
}

X
Xiaoyu Wang 已提交
254 255 256 257
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
  execNode->inUse = 0; // todo
  execNode->numOfEps = vg->numOfEps;
258
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
259
    execNode->epAddr[i] = vg->epAddr[i];
260 261
  }
  return;
X
Xiaoyu Wang 已提交
262 263
}

264
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
265 266
  SVgroupsInfo* pVgroupList = pTable->pMeta->vgroupList;
  for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) {
267
    STORE_CURRENT_SUBPLAN(pCxt);
268
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
D
dapan 已提交
269
    subplan->msgType   = TDMT_VND_QUERY;
X
Xiaoyu Wang 已提交
270
    vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
271
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
272
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
273
    RECOVERY_CURRENT_SUBPLAN(pCxt);
274
  }
275
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
276 277
}

278 279 280
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
H
Haojun Liao 已提交
281
  node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource)));
282 283
  return (SPhyNode*)node;
}
284

285 286 287
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
288 289
}

D
dapan 已提交
290 291 292
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
  vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);

H
Haojun Liao 已提交
293 294
  int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
  return createUserTableScanNode(pPlanNode, pTable, type);
D
dapan 已提交
295 296
}

297
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
298
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
299 300
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
301
  }
D
dapan 已提交
302
  return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
303 304
}

305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode));
  SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo;
  node->aggAlgo = AGG_ALGO_PLAIN;
  node->aggSplit = AGG_SPLIT_FINAL;
  if (NULL != pGroupBy) {
    node->aggAlgo = AGG_ALGO_HASHED;
    node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo));
  }
  return (SPhyNode*)node;
}

static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  // if (needMultiNodeAgg(pPlanNode)) {

  // }
  return createSingleTableAgg(pCxt, pPlanNode);
}

324
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
325 326 327 328 329
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
H
Haojun Liao 已提交
330
    case QNODE_STREAMSCAN:
X
Xiaoyu Wang 已提交
331
    case QNODE_TABLESCAN:
332
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
333
      break;
334 335 336 337
    case QNODE_AGGREGATE:
    case QNODE_GROUPBY:
      node = createAggNode(pCxt, pPlanNode);
      break;
338
    case QNODE_MODIFY:
X
Xiaoyu Wang 已提交
339 340
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
341 342 343
    default:
      assert(false);
  }
H
Haojun Liao 已提交
344

X
Xiaoyu Wang 已提交
345
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
346
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
347 348
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
349
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
350 351 352
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
353
  }
H
Haojun Liao 已提交
354

X
Xiaoyu Wang 已提交
355
  return node;
356 357
}

H
Haojun Liao 已提交
358 359 360 361 362
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;

  size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
  for (int32_t i = 0; i < numOfVgroups; ++i) {
X
Xiaoyu Wang 已提交
363
    STORE_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
364
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
H
Haojun Liao 已提交
365 366
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);

X
Xiaoyu Wang 已提交
367
    vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
368
    subplan->pDataSink  = createDataInserter(pCxt, blocks, NULL);
369 370 371
    subplan->pNode      = NULL;
    subplan->type       = QUERY_TYPE_MODIFY;
    subplan->msgType    = pPayload->msgType;
372
    subplan->id.queryId = pCxt->pDag->queryId;
H
Haojun Liao 已提交
373

X
Xiaoyu Wang 已提交
374
    RECOVERY_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
375 376 377
  }
}

378
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
379
  if (QNODE_MODIFY == pRoot->info.type) {
H
Haojun Liao 已提交
380
    splitModificationOpSubPlan(pCxt, pRoot);
X
Xiaoyu Wang 已提交
381
  } else {
D
dapan 已提交
382
    SSubplan* subplan  = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
383
    ++(pCxt->nextId.templateId);
H
Haojun Liao 已提交
384 385 386

    subplan->msgType   = TDMT_VND_QUERY;
    subplan->pNode     = createPhyNode(pCxt, pRoot);
387
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);    
X
Xiaoyu Wang 已提交
388
  }
389 390 391
  // todo deal subquery
}

392
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId) {
X
Xiaoyu Wang 已提交
393 394 395
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
H
Haojun Liao 已提交
396
      .pDag     = validPointer(calloc(1, sizeof(SQueryDag))),
X
Xiaoyu Wang 已提交
397
      .pCurrentSubplan = NULL,
H
Haojun Liao 已提交
398
      .nextId   = {.queryId = requestId},
X
Xiaoyu Wang 已提交
399
    };
H
Haojun Liao 已提交
400

X
Xiaoyu Wang 已提交
401
    *pDag = context.pDag;
402 403
    context.pDag->queryId = requestId;

404
    context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
X
Xiaoyu Wang 已提交
405 406 407 408 409 410
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
411
  return TSDB_CODE_SUCCESS;
412
}
X
Xiaoyu Wang 已提交
413

H
Haojun Liao 已提交
414
void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) {
415 416 417 418 419 420
  if (NULL == pNode) {
    return;
  }
  if (OP_Exchange == pNode->info.type) {
    SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
    if (templateId == pExchange->srcTemplateId) {
H
Haojun Liao 已提交
421
      taosArrayPush(pExchange->pSrcEndPoints, pSource);
422 423
    }
  }
H
Haojun Liao 已提交
424

425 426 427
  if (pNode->pChildren != NULL) {
    size_t size = taosArrayGetSize(pNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
428
      setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i));
429 430 431 432
    }
  }
}

H
Haojun Liao 已提交
433 434
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
  setExchangSourceNode(templateId, pSource, subplan->pNode);
X
Xiaoyu Wang 已提交
435
}