physicalPlan.c 11.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "plannerInt.h"
X
Xiaoyu Wang 已提交
17
#include "exception.h"
18
#include "parser.h"
19

20 21 22
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

23 24 25 26 27 28 29
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

30 31 32 33 34 35
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
36

X
Xiaoyu Wang 已提交
37 38 39 40 41 42 43
static void* vailidPointer(void* p) {
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

44 45 46 47
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

48 49
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
50
    if (0 == strcmp(name, gOpName[i])) {
51 52 53 54 55
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
56

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
const char* dsinkTypeToDsinkName(int32_t type) {
  switch (type) {
    case DSINK_Dispatch:
      return "Dispatch";
    case DSINK_Insert:
      return "Insert";
    default:
      break;
  }
  return "Unknown";
}

int32_t dsinkNameToDsinkType(const char* name) {
  if (0 == strcmp(name, "Dispatch")) {
    return DSINK_Dispatch;
  } else if (0 == strcmp(name, "Insert")) {
    return DSINK_Insert;
  }
  return DSINK_Unknown;
}

X
Xiaoyu Wang 已提交
78 79 80
static SDataSink* initDataSink(int32_t type, int32_t size) {
  SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size));
  sink->info.type = type;
81
  sink->info.name = dsinkTypeToDsinkName(type);
X
Xiaoyu Wang 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  return sink;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher));
  return (SDataSink*)dispatcher;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter));
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
  SWAP(inserter->pData, pBlocks->pData, char*);
  return (SDataSink*)inserter;
}

98
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
X
Xiaoyu Wang 已提交
99
  dataBlockSchema->numOfCols = pPlanNode->numOfCols;
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols);
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
  memcpy(dataBlockSchema->pSchema, pPlanNode->pSchema, sizeof(SSlotSchema) * pPlanNode->numOfCols);
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
121 122
}

123
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
X
Xiaoyu Wang 已提交
124
  SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
125
  node->info.type = type;
126 127 128 129 130 131
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
    return NULL;
  }
  return node;
132 133
}

134 135 136 137 138 139 140 141 142 143 144
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
  SScanPhyNode* node = (SScanPhyNode*)initPhyNode(pPlanNode, type, size);
  node->uid = pTable->pMeta->pTableMeta->uid;
  node->tableType = pTable->pMeta->pTableMeta->tableType;
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

145
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
  return MASTER_SCAN;
}

static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pTable, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pTable);
  node->window = pTable->window;
  // todo tag cond
  return (SPhyNode*)node;
}

static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}

static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
  return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan);
184 185 186
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
X
Xiaoyu Wang 已提交
187
  SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan)));
188 189 190 191 192 193 194
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
  subplan->type = type;
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
    if (NULL == pCxt->pCurrentSubplan->pChildern) {
X
Xiaoyu Wang 已提交
195
      pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
196
    }
197
    taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
X
Xiaoyu Wang 已提交
198
    subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
199 200 201 202
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
X
Xiaoyu Wang 已提交
203
    currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
204 205 206
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
207
  }
208
  taosArrayPush(currentLevel, &subplan);
209
  pCxt->pCurrentSubplan = subplan;
X
Xiaoyu Wang 已提交
210
  ++(pCxt->pDag->numOfSubplans);
211 212 213
  return subplan;
}

X
Xiaoyu Wang 已提交
214 215 216 217 218 219 220 221 222 223 224
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SEpSet* epSet) {
  epSet->inUse = 0; // todo
  epSet->numOfEps = vg->numOfEps;
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
    epSet->port[i] = vg->epAddr[i].port;
    strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
  }
  return;
}

static void vgroupMsgToEpSet(const SVgroupMsg* vg, SEpSet* epSet) {
225 226 227 228 229 230 231
  epSet->inUse = 0; // todo
  epSet->numOfEps = vg->numOfEps;
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
    epSet->port[i] = vg->epAddr[i].port;
    strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
  }
  return;
X
Xiaoyu Wang 已提交
232 233
}

234
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
235 236
  SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList;
  for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
237
    STORE_CURRENT_SUBPLAN(pCxt);
238
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
239
    vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet);
240
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
X
Xiaoyu Wang 已提交
241
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
242
    RECOVERY_CURRENT_SUBPLAN(pCxt);
243
  }
244
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
245 246
}

247 248 249 250 251
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
  return (SPhyNode*)node;
}
252

253 254 255
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
256 257
}

258
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
259
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
260 261
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
262
  }
263
  return createSingleTableScanNode(pPlanNode, pTable);
264 265 266
}

static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
267 268 269 270 271 272
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
    case QNODE_TABLESCAN:
273
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
274
      break;
X
Xiaoyu Wang 已提交
275 276 277
    case QNODE_INSERT:
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
278 279 280 281
    default:
      assert(false);
  }
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
282
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
283 284
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
285
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
286 287 288
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
289
  }
X
Xiaoyu Wang 已提交
290
  return node;
291 292
}

X
Xiaoyu Wang 已提交
293 294 295 296
static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SArray* vgs = (SArray*)pPlanNode->pExtInfo;
  size_t numOfVg = taosArrayGetSize(vgs);
  for (int32_t i = 0; i < numOfVg; ++i) {
X
Xiaoyu Wang 已提交
297
    STORE_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
298 299 300 301 302
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
    vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
    subplan->pNode = NULL;
    subplan->pDataSink = createDataInserter(pCxt, blocks);
X
Xiaoyu Wang 已提交
303 304
    subplan->type = QUERY_TYPE_MODIFY;
    RECOVERY_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
305 306 307
  }
}

308
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
X
Xiaoyu Wang 已提交
309 310 311 312 313 314 315 316
  if (QNODE_INSERT == pRoot->info.type) {
    splitInsertSubplan(pCxt, pRoot);
  } else {
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
    ++(pCxt->nextId.templateId);
    subplan->pNode = createPhyNode(pCxt, pRoot);
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
  }
317 318 319 320
  // todo deal subquery
}

int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) {
X
Xiaoyu Wang 已提交
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
      .pDag = vailidPointer(calloc(1, sizeof(SQueryDag))),
      .pCurrentSubplan = NULL,
      .nextId = {0} // todo queryid
    };
    *pDag = context.pDag;
    context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
336
  return TSDB_CODE_SUCCESS;
337
}
X
Xiaoyu Wang 已提交
338

D
dapan 已提交
339
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) {
340
  //todo
X
Xiaoyu Wang 已提交
341
}