physicalPlan.c 11.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "plannerInt.h"
X
Xiaoyu Wang 已提交
17
#include "exception.h"
18
#include "parser.h"
19

20 21 22
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

23 24 25 26 27 28 29
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

30 31 32 33 34 35
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
36

37
static void* validPointer(void* p) {
X
Xiaoyu Wang 已提交
38 39 40 41 42 43
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

44 45 46 47
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

48 49
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
50
    if (0 == strcmp(name, gOpName[i])) {
51 52 53 54 55
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
56

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
const char* dsinkTypeToDsinkName(int32_t type) {
  switch (type) {
    case DSINK_Dispatch:
      return "Dispatch";
    case DSINK_Insert:
      return "Insert";
    default:
      break;
  }
  return "Unknown";
}

int32_t dsinkNameToDsinkType(const char* name) {
  if (0 == strcmp(name, "Dispatch")) {
    return DSINK_Dispatch;
  } else if (0 == strcmp(name, "Insert")) {
    return DSINK_Insert;
  }
  return DSINK_Unknown;
}

X
Xiaoyu Wang 已提交
78
static SDataSink* initDataSink(int32_t type, int32_t size) {
79
  SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
80
  sink->info.type = type;
81
  sink->info.name = dsinkTypeToDsinkName(type);
X
Xiaoyu Wang 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  return sink;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher));
  return (SDataSink*)dispatcher;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter));
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
  SWAP(inserter->pData, pBlocks->pData, char*);
  return (SDataSink*)inserter;
}

98
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
X
Xiaoyu Wang 已提交
99
  dataBlockSchema->numOfCols = pPlanNode->numOfCols;
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols);
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
  memcpy(dataBlockSchema->pSchema, pPlanNode->pSchema, sizeof(SSlotSchema) * pPlanNode->numOfCols);
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
121 122
}

123
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
124
  SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
125
  node->info.type = type;
126 127 128 129 130 131
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
    return NULL;
  }
  return node;
132 133
}

134 135 136 137 138 139 140 141 142 143 144
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
  SScanPhyNode* node = (SScanPhyNode*)initPhyNode(pPlanNode, type, size);
  node->uid = pTable->pMeta->pTableMeta->uid;
  node->tableType = pTable->pMeta->pTableMeta->tableType;
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

145
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
146 147 148 149 150 151
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
H
Haojun Liao 已提交
152
  return MAIN_SCAN;
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
}

static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pTable, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pTable);
  node->window = pTable->window;
  // todo tag cond
  return (SPhyNode*)node;
}

static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}

static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
  return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan);
184 185 186
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
187
  SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
188 189
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
H
Haojun Liao 已提交
190 191

  subplan->type  = type;
192 193 194
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
H
Haojun Liao 已提交
195 196
    if (NULL == pCxt->pCurrentSubplan->pChildren) {
      pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
197
    }
H
Haojun Liao 已提交
198 199

    taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
200
    subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
201 202
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
H
Haojun Liao 已提交
203

204 205
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
206
    currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
207 208 209
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
210
  }
H
Haojun Liao 已提交
211

212
  taosArrayPush(currentLevel, &subplan);
213
  pCxt->pCurrentSubplan = subplan;
X
Xiaoyu Wang 已提交
214
  ++(pCxt->pDag->numOfSubplans);
215 216 217
  return subplan;
}

X
Xiaoyu Wang 已提交
218 219 220 221
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
  execNode->inUse = 0; // todo
  execNode->numOfEps = vg->numOfEps;
X
Xiaoyu Wang 已提交
222
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
223
    execNode->epAddr[i] = vg->epAddr[i];
X
Xiaoyu Wang 已提交
224 225 226 227
  }
  return;
}

X
Xiaoyu Wang 已提交
228 229 230 231
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
  execNode->inUse = 0; // todo
  execNode->numOfEps = vg->numOfEps;
232
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
233
    execNode->epAddr[i] = vg->epAddr[i];
234 235
  }
  return;
X
Xiaoyu Wang 已提交
236 237
}

238
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
239 240
  SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList;
  for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
241
    STORE_CURRENT_SUBPLAN(pCxt);
242
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
243
    vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
244
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
X
Xiaoyu Wang 已提交
245
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
246
    RECOVERY_CURRENT_SUBPLAN(pCxt);
247
  }
248
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
249 250
}

251 252 253 254 255
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
  return (SPhyNode*)node;
}
256

257 258 259
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
260 261
}

262
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
263
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
264 265
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
266
  }
267
  return createSingleTableScanNode(pPlanNode, pTable);
268 269 270
}

static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
271 272 273 274 275 276
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
    case QNODE_TABLESCAN:
277
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
278
      break;
H
Haojun Liao 已提交
279 280
    case QNODE_PROJECT:
//      node = create
281
    case QNODE_MODIFY:
X
Xiaoyu Wang 已提交
282 283
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
284 285 286
    default:
      assert(false);
  }
H
Haojun Liao 已提交
287

X
Xiaoyu Wang 已提交
288
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
289
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
290 291
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
292
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
293 294 295
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
296
  }
H
Haojun Liao 已提交
297

X
Xiaoyu Wang 已提交
298
  return node;
299 300
}

H
Haojun Liao 已提交
301 302 303 304 305
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;

  size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
  for (int32_t i = 0; i < numOfVgroups; ++i) {
X
Xiaoyu Wang 已提交
306
    STORE_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
307
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
H
Haojun Liao 已提交
308 309
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);

X
Xiaoyu Wang 已提交
310
    vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
311 312 313 314
    subplan->pDataSink  = createDataInserter(pCxt, blocks);
    subplan->pNode      = NULL;
    subplan->type       = QUERY_TYPE_MODIFY;
    subplan->msgType    = pPayload->msgType;
315
    subplan->id.queryId = pCxt->pDag->queryId;
H
Haojun Liao 已提交
316

X
Xiaoyu Wang 已提交
317
    RECOVERY_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
318 319 320
  }
}

321
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
322
  if (QNODE_MODIFY == pRoot->info.type) {
H
Haojun Liao 已提交
323
    splitModificationOpSubPlan(pCxt, pRoot);
X
Xiaoyu Wang 已提交
324
  } else {
H
Haojun Liao 已提交
325
    SSubplan* subplan  = initSubplan(pCxt, QUERY_TYPE_MERGE);
X
Xiaoyu Wang 已提交
326
    ++(pCxt->nextId.templateId);
H
Haojun Liao 已提交
327 328 329

    subplan->msgType   = TDMT_VND_QUERY;
    subplan->pNode     = createPhyNode(pCxt, pRoot);
X
Xiaoyu Wang 已提交
330 331
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
  }
332 333 334
  // todo deal subquery
}

335
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId) {
X
Xiaoyu Wang 已提交
336 337 338
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
339
      .pDag = validPointer(calloc(1, sizeof(SQueryDag))),
X
Xiaoyu Wang 已提交
340
      .pCurrentSubplan = NULL,
H
Haojun Liao 已提交
341
      .nextId = {.queryId = requestId},
X
Xiaoyu Wang 已提交
342
    };
H
Haojun Liao 已提交
343

X
Xiaoyu Wang 已提交
344
    *pDag = context.pDag;
345 346
    context.pDag->queryId = requestId;

347
    context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
X
Xiaoyu Wang 已提交
348 349 350 351 352 353
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
354
  return TSDB_CODE_SUCCESS;
355
}
X
Xiaoyu Wang 已提交
356

D
dapan1121 已提交
357
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
358
  //todo
X
Xiaoyu Wang 已提交
359
}