提交 454a488c 编写于 作者: D dapan1121

feature/qnode

上级 bbada5a8
......@@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList);
......
......@@ -59,7 +59,15 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param qnodeList Qnode address list, element is SEpAddr
* @return
*/
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob);
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows);
/**
* Process the query job, generated according to the query physical plan.
* This is a asynchronized API, and is also thread-safety.
* @param qnodeList Qnode address list, element is SEpAddr
* @return
*/
int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob);
int32_t scheduleFetchRows(void *pJob, void **data);
......@@ -79,4 +87,4 @@ void schedulerDestroy(void);
}
#endif
#endif /*_TD_SCHEDULER_H_*/
\ No newline at end of file
#endif /*_TD_SCHEDULER_H_*/
......@@ -634,8 +634,8 @@ _return:
CTG_RET(code);
}
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeEpSet) {
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
......
......@@ -53,6 +53,12 @@ typedef struct STscObj {
namespace {
int32_t ctgTestVgNum = 10;
char *ctgTestClusterId = "cluster1";
char *ctgTestDbname = "1.db1";
char *ctgTestTablename = "table1";
void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
strcpy(pReq->db, "1.db1");
......@@ -91,6 +97,34 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SUseDbRsp *rspMsg = NULL; //todo
pRsp->code =0;
pRsp->contLen = sizeof(SUseDbRsp) + ctgTestVgNum * sizeof(SVgroupInfo);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (SUseDbRsp *)pRsp->pCont;
strcpy(rspMsg->db, ctgTestDbname);
rspMsg->vgVersion = 1;
rspMsg->vgNum = ctgTestVgNum;
rspMsg->hashMethod = 0;
SVgroupInfo *vg = NULL;
uint32_t hashUnit = UINT32_MAX / ctgTestVgNum;
for (int32_t i = 0; i < ctgTestVgNum; ++i) {
vg = &rspMsg->vgroupInfo[i];
vg->vgId = i + 1;
vg->hashBegin = i * hashUnit;
vg->hashEnd = hashUnit * (i + 1) - 1;
vg->numOfEps = i % TSDB_MAX_REPLICA + 1;
vg->inUse = i % vg->numOfEps;
for (int32_t n = 0; n < vg->numOfEps; ++n) {
SEpAddrMsg *addr = &vg->epAddr[n];
strcpy(addr->fqdn, "a0");
addr->port = n + 22;
}
}
vg->hashEnd = UINT32_MAX;
return;
}
......@@ -112,30 +146,24 @@ void initTestEnv() {
}
TEST(testCase, normalCase) {
STscObj* pConn = (STscObj *)taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
char *clusterId = "cluster1";
char *dbname = "1.db1";
char *tablename = "table1";
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
initTestEnv();
initQueryModuleMsgHandle();
sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(clusterId, &pCtg);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
code = catalogGetTableHashVgroup(pCtg, pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet, dbname, tablename, &vgInfo);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &vgInfo);
ASSERT_EQ(code, 0);
taos_close(pConn);
}
......
......@@ -10,3 +10,5 @@ target_link_libraries(
qworker
PRIVATE os util transport planner qcom
)
ADD_SUBDIRECTORY(test)
\ No newline at end of file
MESSAGE(STATUS "build qworker unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
qworkerTest
PUBLIC os util common transport gtest qcom
)
TARGET_INCLUDE_DIRECTORIES(
qworkerTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/qworker/inc"
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "tep.h"
#include "trpc.h"
#include "stub.h"
#include "addr_any.h"
namespace {
}
void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SUseDbRsp *rspMsg = NULL; //todo
return;
}
void initTestEnv() {
static Stub stub;
stub.set(rpcSendRecv, __rpcSendRecv);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
for (const auto& f : result) {
stub.set(f.second, __rpcSendRecv);
}
}
}
TEST(testCase, normalCase) {
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -68,11 +68,17 @@ typedef struct SQueryTask {
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
} SQueryTask;
typedef struct SQueryJobAttr {
bool needFetch;
bool syncQuery;
} SQueryJobAttr;
typedef struct SQueryJob {
uint64_t queryId;
int32_t levelNum;
int32_t levelIdx;
int8_t status;
SQueryJobAttr attr;
SQueryProfileSummary summary;
SEpSet dataSrcEps;
SEpAddr resEp;
......@@ -81,8 +87,10 @@ typedef struct SQueryJob {
tsem_t rspSem;
int32_t userFetch;
int32_t remoteFetch;
void *res;
void *res;
int32_t resNumOfRows;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
......
......@@ -152,6 +152,8 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->attr.needFetch = true;
job->levelNum = levelNum;
job->levelIdx = levelNum - 1;
......@@ -196,6 +198,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
for (int32_t n = 0; n < levelPlanNum; ++n) {
SSubplan *plan = taosArrayGet(levelPlans, n);
SQueryTask task = {0};
if (plan->type == QUERY_TYPE_MODIFY) {
job->attr.needFetch = false;
}
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task.plan = plan;
......@@ -424,8 +430,12 @@ _return:
}
int32_t schProcessOnJobSuccess(SQueryJob *job) {
job->status = JOB_TASK_STATUS_SUCCEED;
int32_t schProcessOnJobPartialSuccess(SQueryJob *job) {
job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
if ((!job->attr.needFetch) && job->attr.syncQuery) {
tsem_post(&job->rspSem);
}
if (job->userFetch) {
SCH_ERR_RET(schFetchFromRemote(job));
......@@ -495,7 +505,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
job->resEp.port = task->execAddr.port;
}
SCH_ERR_RET(schProcessOnJobSuccess(job));
SCH_ERR_RET(schProcessOnJobPartialSuccess(job));
return TSDB_CODE_SUCCESS;
}
......@@ -560,34 +570,63 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
return TSDB_CODE_SUCCESS;
}
int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) {
int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0;
switch (msgType) {
case TSDB_MSG_TYPE_QUERY:
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY);
if (code) {
goto _task_error;
case TSDB_MSG_TYPE_SUBMIT: {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else {
job->resNumOfRows += rsp->numOfRows;
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
}
}
break;
}
break;
case TSDB_MSG_TYPE_RES_READY:
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
}
case TSDB_MSG_TYPE_QUERY: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else {
code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY);
if (code) {
goto _task_error;
}
}
break;
}
case TSDB_MSG_TYPE_RES_READY: {
SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else {
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
}
}
break;
}
case TSDB_MSG_TYPE_FETCH: {
SCH_ERR_JRET(rspCode);
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
if (rsp->completed) {
job->status = JOB_TASK_STATUS_SUCCEED;
}
job->res = rsp;
job->resNumOfRows = rsp->numOfRows;
SCH_ERR_JRET(schProcessOnDataFetched(job));
break;
}
break;
case TSDB_MSG_TYPE_FETCH:
SCH_ERR_JRET(rspCode);
SCH_ERR_JRET(schProcessOnDataFetched(job));
break;
default:
qError("unknown msg type:%d received", msgType);
return TSDB_CODE_QRY_INVALID_INPUT;
......@@ -680,7 +719,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
}
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncQuery) {
if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -695,6 +734,7 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->attr.syncQuery = syncQuery;
job->transport = transport;
job->qnodeList = qnodeList;
......@@ -719,10 +759,16 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
}
tsem_init(&job->rspSem, 0, 0);
if (0 != taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) {
qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} else {
qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
job->status = JOB_TASK_STATUS_NOT_START;
......@@ -731,6 +777,10 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
*(SQueryJob **)pJob = job;
if (syncQuery) {
tsem_wait(&job->rspSem);
}
return TSDB_CODE_SUCCESS;
_return:
......@@ -741,20 +791,47 @@ _return:
SCH_RET(code);
}
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) {
*numOfRows = 0;
SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));
SQueryJob *job = *(SQueryJob **)pJob;
*numOfRows = job->resNumOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false);
}
int32_t scheduleFetchRows(void *pJob, void **data) {
if (NULL == pJob || NULL == data) {
return TSDB_CODE_QRY_INVALID_INPUT;
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SQueryJob *job = pJob;
int32_t code = 0;
if (!job->attr.needFetch) {
qError("no need to fetch data");
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (job->status == JOB_TASK_STATUS_SUCCEED) {
job->res = NULL;
return TSDB_CODE_SUCCESS;
}
if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
qError("prior fetching not finished");
return TSDB_CODE_QRY_APP_ERROR;
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (job->status == JOB_TASK_STATUS_SUCCEED) {
if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(job));
}
......@@ -766,7 +843,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
_return:
atomic_val_compare_exchange_32(&job->userFetch, 1, 0);
return code;
SCH_RET(code);
}
int32_t scheduleCancelJob(void *pJob) {
......
......@@ -26,72 +26,72 @@
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "catalog.h"
#include "scheduler.h"
#include "catalog.h"
#include "scheduler.h"
#include "tep.h"
#include "trpc.h"
namespace {
void mockBuildDag(SQueryDag *dag) {
uint64_t qId = 0x111111111111;
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
SArray *scan = taosArrayInit(1, sizeof(SSubplan));
SArray *merge = taosArrayInit(1, sizeof(SSubplan));
SSubplan scanPlan = {0};
SSubplan mergePlan = {0};
scanPlan.id.queryId = qId;
scanPlan.id.templateId = 0x2222222222;
scanPlan.id.subplanId = 0x3333333333;
scanPlan.type = QUERY_TYPE_SCAN;
scanPlan.level = 1;
scanPlan.execEpSet.numOfEps = 1;
scanPlan.pChildern = NULL;
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
mergePlan.id.queryId = qId;
mergePlan.id.templateId = 0x4444444444;
mergePlan.id.subplanId = 0x5555555555;
mergePlan.type = QUERY_TYPE_MERGE;
mergePlan.level = 0;
mergePlan.execEpSet.numOfEps = 1;
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES);
mergePlan.pParents = NULL;
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
taosArrayPush(mergePointer->pChildern, &scanPointer);
taosArrayPush(scanPointer->pParents, &mergePointer);
taosArrayPush(dag->pSubplans, &merge);
taosArrayPush(dag->pSubplans, &scan);
}
void mockBuildDag(SQueryDag *dag) {
uint64_t qId = 0x111111111111;
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
SArray *scan = taosArrayInit(1, sizeof(SSubplan));
SArray *merge = taosArrayInit(1, sizeof(SSubplan));
SSubplan scanPlan = {0};
SSubplan mergePlan = {0};
scanPlan.id.queryId = qId;
scanPlan.id.templateId = 0x2222222222;
scanPlan.id.subplanId = 0x3333333333;
scanPlan.type = QUERY_TYPE_SCAN;
scanPlan.level = 1;
scanPlan.execEpSet.numOfEps = 1;
scanPlan.pChildern = NULL;
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
mergePlan.id.queryId = qId;
mergePlan.id.templateId = 0x4444444444;
mergePlan.id.subplanId = 0x5555555555;
mergePlan.type = QUERY_TYPE_MERGE;
mergePlan.level = 0;
mergePlan.execEpSet.numOfEps = 1;
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES);
mergePlan.pParents = NULL;
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
taosArrayPush(mergePointer->pChildern, &scanPointer);
taosArrayPush(scanPointer->pParents, &mergePointer);
taosArrayPush(dag->pSubplans, &merge);
taosArrayPush(dag->pSubplans, &scan);
}
}
TEST(testCase, normalCase) {
void *mockPointer = (void *)0x1;
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
void *pJob = NULL;
SQueryDag dag = {0};
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
int32_t code = schedulerInit(NULL);
SVgroupInfo vgInfo = {0};
void *pJob = NULL;
SQueryDag dag = {0};
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
int32_t code = schedulerInit(NULL);
ASSERT_EQ(code, 0);
mockBuildDag(&dag);
code = scheduleExecJob(mockPointer, qnodeList, &dag, &pJob);
mockBuildDag(&dag);
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
ASSERT_EQ(code, 0);
}
}
int main(int argc, char** argv) {
......@@ -101,4 +101,4 @@ int main(int argc, char** argv) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册