未验证 提交 cc538002 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9206 from taosdata/feature/scheduler

Feature/scheduler
......@@ -50,13 +50,15 @@ typedef struct SQueryProfileSummary {
uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary;
int32_t schedulerInit(SSchedulerCfg *cfg);
/**
* Process the query job, generated according to the query physical plan.
* This is a synchronized API, and is also thread-safety.
* @param pJob
* @param qnodeList Qnode address list, element is SEpAddr
* @return
*/
int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob);
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob);
int32_t scheduleFetchRows(void *pJob, void **data);
......@@ -70,6 +72,8 @@ int32_t scheduleCancelJob(void *pJob);
void scheduleFreeJob(void *pJob);
void schedulerDestroy(void);
#ifdef __cplusplus
}
#endif
......
......@@ -11,3 +11,5 @@ target_link_libraries(
scheduler
PRIVATE os util planner qcom common catalog transport
)
ADD_SUBDIRECTORY(test)
\ No newline at end of file
......@@ -73,9 +73,8 @@ typedef struct SQueryJob {
SQueryProfileSummary summary;
SEpSet dataSrcEps;
SEpAddr resEp;
struct SCatalog *catalog;
void *rpc;
SEpSet *mgmtEpSet;
void *transport;
SArray *qnodeList;
tsem_t rspSem;
int32_t userFetch;
int32_t remoteFetch;
......
......@@ -58,8 +58,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
for (int32_t m = 0; m < level->taskNum; ++m) {
SQueryTask *task = taosArrayGet(level->subTasks, m);
SSubplan *plan = task->plan;
int32_t childNum = (int32_t)taosArrayGetSize(plan->pChildern);
int32_t parentNum = (int32_t)taosArrayGetSize(plan->pParents);
int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
if (childNum > 0) {
task->children = taosArrayInit(childNum, POINTER_BYTES);
......@@ -187,13 +187,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
for (int32_t n = 0; n < levelPlanNum; ++n) {
SSubplan *plan = taosArrayGet(levelPlans, n);
SQueryTask *task = taosArrayGet(level.subTasks, n);
SQueryTask task = {0};
task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task->plan = plan;
task->status = SCH_STATUS_NOT_START;
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task.plan = plan;
task.status = SCH_STATUS_NOT_START;
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) {
void *p = taosArrayPush(level.subTasks, &task);
if (NULL == p) {
qError("taosArrayPush failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
qError("taosHashPut failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -225,20 +231,27 @@ _return:
SCH_RET(code);
}
int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) {
int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) {
if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
return TSDB_CODE_SUCCESS;
}
if (SCH_HAS_QNODE_IN_CLUSTER(schMgmt.cfg.clusterType)) {
SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet));
} else {
for (int32_t i = 0; i < job->dataSrcEps.numOfEps; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
++epSet->numOfEps;
}
int32_t qnodeNum = taosArrayGetSize(job->qnodeList);
for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
SEpAddr *addr = taosArrayGet(job->qnodeList, i);
strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn));
epSet->port[epSet->numOfEps] = addr->port;
++epSet->numOfEps;
}
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
++epSet->numOfEps;
}
return TSDB_CODE_SUCCESS;
......@@ -509,7 +522,12 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(qSubPlanToString(plan, &task->msg));
if (plan->execEpSet.numOfEps <= 0) {
SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet));
SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
}
if (plan->execEpSet.numOfEps <= 0) {
SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY));
......@@ -548,20 +566,23 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
}
int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (taosArrayGetSize(qnodeList) <= 0) {
qInfo("qnodeList is empty");
}
int32_t code = 0;
SQueryJob *job = calloc(1, sizeof(SQueryJob));
if (NULL == job) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->catalog = pCatalog;
job->rpc = pRpc;
job->mgmtEpSet = (SEpSet *)pMgmtEps;
job->transport = transport;
job->qnodeList = qnodeList;
SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
......
MESSAGE(STATUS "build scheduler unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(schedulerTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
schedulerTest
PUBLIC os util common catalog transport gtest qcom taos planner scheduler
)
TARGET_INCLUDE_DIRECTORIES(
schedulerTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/scheduler/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/scheduler/inc"
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "catalog.h"
#include "scheduler.h"
#include "tep.h"
#include "trpc.h"
namespace {
void mockBuildDag(SQueryDag *dag) {
uint64_t qId = 0x111111111111;
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
SArray *scan = taosArrayInit(1, sizeof(SSubplan));
SArray *merge = taosArrayInit(1, sizeof(SSubplan));
SSubplan scanPlan = {0};
SSubplan mergePlan = {0};
scanPlan.id.queryId = qId;
scanPlan.id.templateId = 0x2222222222;
scanPlan.id.subplanId = 0x3333333333;
scanPlan.type = QUERY_TYPE_SCAN;
scanPlan.level = 1;
scanPlan.execEpSet.numOfEps = 1;
scanPlan.pChildern = NULL;
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
mergePlan.id.queryId = qId;
mergePlan.id.templateId = 0x4444444444;
mergePlan.id.subplanId = 0x5555555555;
mergePlan.type = QUERY_TYPE_MERGE;
mergePlan.level = 0;
mergePlan.execEpSet.numOfEps = 1;
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES);
mergePlan.pParents = NULL;
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
taosArrayPush(mergePointer->pChildern, &scanPointer);
taosArrayPush(scanPointer->pParents, &mergePointer);
taosArrayPush(dag->pSubplans, &merge);
taosArrayPush(dag->pSubplans, &scan);
}
}
TEST(testCase, normalCase) {
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
void *pJob = NULL;
SQueryDag dag = {0};
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
int32_t code = schedulerInit(NULL);
ASSERT_EQ(code, 0);
mockBuildDag(&dag);
code = scheduleExecJob(mockPointer, qnodeList, &dag, &pJob);
ASSERT_EQ(code, 0);
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册