提交 2e947223 编写于 作者: H Hongze Cheng

Merge branch 'feature/3.0_liaohj' of github.com:taosdata/TDengine into feature/vnode

......@@ -119,11 +119,12 @@ typedef struct SSubplanId {
} SSubplanId;
typedef struct SSubplan {
SSubplanId id; // unique id of the subplan
SSubplanId id; // unique id of the subplan
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
int32_t level; // the execution level of current subplan, starting from 0.
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
SEpSet execEpSet; // for the scan/modify subplan, the optional execution node
SArray *pChildern; // the datasource subplan,from which to fetch the result
SArray *pChildren; // the datasource subplan,from which to fetch the result
SArray *pParents; // the data destination subplan, get data from current subplan
SPhyNode *pNode; // physical plan of current subplan
SDataSink *pDataSink; // data of the subplan flow into the datasink
......
......@@ -140,7 +140,9 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
SRequestObj *pRequest = (SRequestObj *) pRes;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS) {
pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS ||
taos_num_fields(pRes) == 0) {
return NULL;
}
......
......@@ -279,7 +279,7 @@ TEST(testCase, connect_Test) {
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, create_table_Test) {
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// // assert(pConn != NULL);
......@@ -470,9 +470,9 @@ TEST(testCase, create_multiple_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)");
pRes = taos_query(pConn, "create table t_2 using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
......
......@@ -392,7 +392,7 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
%type create_stable_args{SCreateTableSql*}
create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
A = tSetCreateTableInfo(X, Y, NULL, TSQL_CREATE_STABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_STABLE);
V.n += Z.n;
setCreatedTableName(pInfo, &V, &U);
......
......@@ -785,7 +785,7 @@ void destroySqlInfo(SSqlInfo *pInfo) {
taosArrayDestroy(pInfo->funcs);
if (pInfo->type == TSDB_SQL_SELECT) {
destroyAllSqlNode(&pInfo->sub);
} else if (pInfo->type == TSDB_SQL_CREATE_TABLE) {
} else if (pInfo->type == TSDB_SQL_CREATE_STABLE) {
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem);
......
......@@ -761,8 +761,9 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
break;
}
case TSDB_SQL_CREATE_TABLE: {
case TSDB_SQL_CREATE_STABLE: {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type != TSQL_CREATE_CTABLE);
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
......@@ -772,13 +773,6 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) !=
TSDB_CODE_SUCCESS) {
goto _error;
}
pDcl->msgType = TDMT_VND_CREATE_TABLE;
} else if (pCreateTable->type == TSQL_CREATE_STREAM) {
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
// return code;
......
此差异已折叠。
......@@ -70,6 +70,11 @@ typedef struct SQueryPlanNode {
struct SQueryPlanNode *pParent;
} SQueryPlanNode;
typedef struct SDataPayloadInfo {
int32_t msgType;
SArray *payload;
} SDataPayloadInfo;
/**
* Optimize the query execution plan, currently not implement yet.
* @param pQueryNode
......
......@@ -37,18 +37,28 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0;
}
static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
if (NULL == *pQueryPlan || NULL == blocks) {
SDataPayloadInfo* pPayload = calloc(1, sizeof(SDataPayloadInfo));
if (NULL == *pQueryPlan || NULL == blocks || NULL == pPayload) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
(*pQueryPlan)->info.type = QNODE_MODIFY;
taosArrayAddAll(blocks, pInsert->pDataBlocks);
(*pQueryPlan)->pExtInfo = blocks;
if (pNode->type == TSDB_SQL_INSERT) {
pPayload->msgType = TDMT_VND_SUBMIT;
} else if (pNode->type == TSDB_SQL_CREATE_TABLE) {
pPayload->msgType = TDMT_VND_CREATE_TABLE;
}
pPayload->payload = blocks;
(*pQueryPlan)->pExtInfo = pPayload;
return TSDB_CODE_SUCCESS;
}
......@@ -69,7 +79,7 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
case TSDB_SQL_INSERT:
case TSDB_SQL_CREATE_TABLE:
return createInsertPlan(pNode, pQueryPlan);
return createModificationOpPlan(pNode, pQueryPlan);
default:
return TSDB_CODE_FAILED;
......
......@@ -191,13 +191,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
subplan->level = 0;
if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1;
if (NULL == pCxt->pCurrentSubplan->pChildern) {
pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
if (NULL == pCxt->pCurrentSubplan->pChildren) {
pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
}
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
}
SArray* currentLevel;
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
......@@ -205,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
} else {
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
}
taosArrayPush(currentLevel, &subplan);
pCxt->pCurrentSubplan = subplan;
++(pCxt->pDag->numOfSubplans);
......@@ -278,6 +281,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
default:
assert(false);
}
if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
size_t size = taosArrayGetSize(pPlanNode->pChildren);
......@@ -287,31 +291,38 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
taosArrayPush(node->pChildren, &child);
}
}
return node;
}
static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SArray* vgs = (SArray*)pPlanNode->pExtInfo;
size_t numOfVg = taosArrayGetSize(vgs);
for (int32_t i = 0; i < numOfVg; ++i) {
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;
size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
for (int32_t i = 0; i < numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
subplan->pNode = NULL;
subplan->pDataSink = createDataInserter(pCxt, blocks);
subplan->type = QUERY_TYPE_MODIFY;
subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType;
RECOVERY_CURRENT_SUBPLAN(pCxt);
}
}
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_MODIFY == pRoot->info.type) {
splitInsertSubplan(pCxt, pRoot);
splitModificationOpSubPlan(pCxt, pRoot);
} else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
++(pCxt->nextId.templateId);
subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->msgType = TDMT_VND_QUERY;
subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
}
// todo deal subquery
......@@ -325,6 +336,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
.pCurrentSubplan = NULL,
.nextId = {0} // todo queryid
};
*pDag = context.pDag;
context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode);
......
......@@ -793,7 +793,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
return NULL;
}
// The 'type', 'level', 'execEpSet', 'pChildern' and 'pParents' fields do not need to be serialized.
// The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized.
bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id);
if (res) {
......@@ -835,7 +835,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
SDataInserter* insert = (SDataInserter*)(subplan->pDataSink);
*len = insert->size;
*str = insert->pData;
insert->pData == NULL;
insert->pData = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -844,6 +844,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
*str = cJSON_Print(json);
*len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS;
......
......@@ -31,17 +31,20 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag)
destroyQueryPlan(logicPlan);
return code;
}
code = optimizeQueryPlan(logicPlan);
if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan);
return code;
}
code = createDag(logicPlan, NULL, pDag);
if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan);
qDestroyQueryDag(*pDag);
return code;
}
destroyQueryPlan(logicPlan);
return TSDB_CODE_SUCCESS;
}
......
......@@ -28,7 +28,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
for (int32_t m = 0; m < level->taskNum; ++m) {
SSchTask *task = taosArrayGet(level->subTasks, m);
SSubplan *plan = task->plan;
int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
int32_t childNum = plan->pChildren ? (int32_t)taosArrayGetSize(plan->pChildren) : 0;
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
if (childNum > 0) {
......@@ -40,7 +40,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
}
for (int32_t n = 0; n < childNum; ++n) {
SSubplan **child = taosArrayGet(plan->pChildern, n);
SSubplan **child = taosArrayGet(plan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) {
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
......@@ -122,6 +122,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
//??
job->attr.needFetch = true;
job->levelNum = levelNum;
......@@ -547,22 +548,29 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
_return:
tfree(param);
SCH_RET(code);
}
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
......@@ -570,6 +578,9 @@ int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
*fp = schHandleCreateTableCallback;
break;
case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback;
break;
......@@ -640,6 +651,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
int32_t code = 0;
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_SUBMIT: {
if (NULL == task->msg || task->msgLen <= 0) {
qError("submit msg is NULL");
......@@ -746,19 +758,15 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
// int32_t msgType = (plan->type == QUERY_TYPE_MODIFY)? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType));
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
SCH_ERR_RET(schPushTaskToExecList(job, task));
task->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchJob(SSchJob *job) {
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
for (int32_t i = 0; i < level->taskNum; ++i) {
......
......@@ -58,7 +58,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan.execEpSet.numOfEps = 1;
scanPlan.execEpSet.port[0] = 6030;
strcpy(scanPlan.execEpSet.fqdn[0], "ep0");
scanPlan.pChildern = NULL;
scanPlan.pChildren = NULL;
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
......@@ -68,14 +68,14 @@ void schtBuildQueryDag(SQueryDag *dag) {
mergePlan.type = QUERY_TYPE_MERGE;
mergePlan.level = 0;
mergePlan.execEpSet.numOfEps = 0;
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES);
mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan.pParents = NULL;
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
taosArrayPush(mergePointer->pChildern, &scanPointer);
taosArrayPush(mergePointer->pChildren, &scanPointer);
taosArrayPush(scanPointer->pParents, &mergePointer);
taosArrayPush(dag->pSubplans, &merge);
......@@ -100,7 +100,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[0].execEpSet.numOfEps = 1;
insertPlan[0].execEpSet.port[0] = 6030;
strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0");
insertPlan[0].pChildern = NULL;
insertPlan[0].pChildren = NULL;
insertPlan[0].pParents = NULL;
insertPlan[0].pNode = NULL;
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
......@@ -113,7 +113,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
insertPlan[1].execEpSet.numOfEps = 1;
insertPlan[1].execEpSet.port[0] = 6030;
strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1");
insertPlan[1].pChildern = NULL;
insertPlan[1].pChildren = NULL;
insertPlan[1].pParents = NULL;
insertPlan[1].pNode = NULL;
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册