physicalPlan.c 11.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "plannerInt.h"
X
Xiaoyu Wang 已提交
17
#include "exception.h"
18
#include "parser.h"
19

20 21 22
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

23 24 25 26 27 28 29
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

30 31 32 33 34 35
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
36

37
static void* validPointer(void* p) {
X
Xiaoyu Wang 已提交
38 39 40 41 42 43
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

44 45 46 47
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

48 49
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
50
    if (0 == strcmp(name, gOpName[i])) {
51 52 53 54 55
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
56

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
const char* dsinkTypeToDsinkName(int32_t type) {
  switch (type) {
    case DSINK_Dispatch:
      return "Dispatch";
    case DSINK_Insert:
      return "Insert";
    default:
      break;
  }
  return "Unknown";
}

int32_t dsinkNameToDsinkType(const char* name) {
  if (0 == strcmp(name, "Dispatch")) {
    return DSINK_Dispatch;
  } else if (0 == strcmp(name, "Insert")) {
    return DSINK_Insert;
  }
  return DSINK_Unknown;
}

X
Xiaoyu Wang 已提交
78
static SDataSink* initDataSink(int32_t type, int32_t size) {
79
  SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
80
  sink->info.type = type;
81
  sink->info.name = dsinkTypeToDsinkName(type);
X
Xiaoyu Wang 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  return sink;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher));
  return (SDataSink*)dispatcher;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter));
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
  SWAP(inserter->pData, pBlocks->pData, char*);
  return (SDataSink*)inserter;
}

98
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
X
Xiaoyu Wang 已提交
99
  dataBlockSchema->numOfCols = pPlanNode->numOfCols;
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols);
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
  memcpy(dataBlockSchema->pSchema, pPlanNode->pSchema, sizeof(SSlotSchema) * pPlanNode->numOfCols);
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
121 122
}

123
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
124
  SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
125
  node->info.type = type;
126 127 128 129 130 131
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
    return NULL;
  }
  return node;
132 133
}

134 135 136 137 138 139 140 141 142 143 144
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
  SScanPhyNode* node = (SScanPhyNode*)initPhyNode(pPlanNode, type, size);
  node->uid = pTable->pMeta->pTableMeta->uid;
  node->tableType = pTable->pMeta->pTableMeta->tableType;
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

145
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
  return MASTER_SCAN;
}

static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pTable, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pTable);
  node->window = pTable->window;
  // todo tag cond
  return (SPhyNode*)node;
}


static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
  return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan);
181 182 183
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
184
  SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
185 186
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
H
Haojun Liao 已提交
187 188

  subplan->type  = type;
189 190 191
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
H
Haojun Liao 已提交
192 193
    if (NULL == pCxt->pCurrentSubplan->pChildren) {
      pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
194
    }
H
Haojun Liao 已提交
195 196

    taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
197
    subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
198 199
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
H
Haojun Liao 已提交
200

201 202
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
203
    currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
204 205 206
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
207
  }
H
Haojun Liao 已提交
208

209
  taosArrayPush(currentLevel, &subplan);
210
  pCxt->pCurrentSubplan = subplan;
X
Xiaoyu Wang 已提交
211
  ++(pCxt->pDag->numOfSubplans);
212 213 214
  return subplan;
}

X
Xiaoyu Wang 已提交
215 216 217 218
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
  execNode->inUse = 0; // todo
  execNode->numOfEps = vg->numOfEps;
X
Xiaoyu Wang 已提交
219
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
220
    execNode->epAddr[i] = vg->epAddr[i];
X
Xiaoyu Wang 已提交
221 222 223 224
  }
  return;
}

X
Xiaoyu Wang 已提交
225 226 227 228
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
  execNode->inUse = 0; // todo
  execNode->numOfEps = vg->numOfEps;
229
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
230
    execNode->epAddr[i] = vg->epAddr[i];
231 232
  }
  return;
X
Xiaoyu Wang 已提交
233 234
}

235
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
236 237
  SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList;
  for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
238
    STORE_CURRENT_SUBPLAN(pCxt);
239
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
240
    vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
241
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
X
Xiaoyu Wang 已提交
242
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
243
    RECOVERY_CURRENT_SUBPLAN(pCxt);
244
  }
245
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
246 247
}

248 249 250 251 252
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
  return (SPhyNode*)node;
}
253

254 255 256
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
257 258
}

D
dapan 已提交
259 260 261 262 263 264 265
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
  vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);

  return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}


266
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
267
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
D
dapan 已提交
268
    
269 270
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
271
  }
D
dapan 已提交
272
  return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
273 274 275
}

static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
276 277 278 279 280 281
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
    case QNODE_TABLESCAN:
282
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
283
      break;
H
Haojun Liao 已提交
284 285
    case QNODE_PROJECT:
//      node = create
286
    case QNODE_MODIFY:
X
Xiaoyu Wang 已提交
287 288
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
289 290 291
    default:
      assert(false);
  }
H
Haojun Liao 已提交
292

X
Xiaoyu Wang 已提交
293
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
294
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
295 296
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
297
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
298 299 300
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
301
  }
H
Haojun Liao 已提交
302

X
Xiaoyu Wang 已提交
303
  return node;
304 305
}

H
Haojun Liao 已提交
306 307 308 309 310
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;

  size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
  for (int32_t i = 0; i < numOfVgroups; ++i) {
X
Xiaoyu Wang 已提交
311
    STORE_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
312
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
H
Haojun Liao 已提交
313 314
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);

X
Xiaoyu Wang 已提交
315
    vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
316 317 318 319
    subplan->pDataSink  = createDataInserter(pCxt, blocks);
    subplan->pNode      = NULL;
    subplan->type       = QUERY_TYPE_MODIFY;
    subplan->msgType    = pPayload->msgType;
320
    subplan->id.queryId = pCxt->pDag->queryId;
H
Haojun Liao 已提交
321

X
Xiaoyu Wang 已提交
322
    RECOVERY_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
323 324 325
  }
}

326
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
327
  if (QNODE_MODIFY == pRoot->info.type) {
H
Haojun Liao 已提交
328
    splitModificationOpSubPlan(pCxt, pRoot);
X
Xiaoyu Wang 已提交
329
  } else {
D
dapan 已提交
330
    SSubplan* subplan  = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
331
    ++(pCxt->nextId.templateId);
H
Haojun Liao 已提交
332 333 334

    subplan->msgType   = TDMT_VND_QUERY;
    subplan->pNode     = createPhyNode(pCxt, pRoot);
D
dapan 已提交
335
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot);    
X
Xiaoyu Wang 已提交
336
  }
337 338 339
  // todo deal subquery
}

340
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId) {
X
Xiaoyu Wang 已提交
341 342 343
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
344
      .pDag = validPointer(calloc(1, sizeof(SQueryDag))),
X
Xiaoyu Wang 已提交
345
      .pCurrentSubplan = NULL,
H
Haojun Liao 已提交
346
      .nextId = {.queryId = requestId},
X
Xiaoyu Wang 已提交
347
    };
H
Haojun Liao 已提交
348

X
Xiaoyu Wang 已提交
349
    *pDag = context.pDag;
350 351
    context.pDag->queryId = requestId;

352
    context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
X
Xiaoyu Wang 已提交
353 354 355 356 357 358
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
359
  return TSDB_CODE_SUCCESS;
360
}
X
Xiaoyu Wang 已提交
361

D
dapan1121 已提交
362
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
363
  //todo
X
Xiaoyu Wang 已提交
364
}