提交 250e5152 编写于 作者: D dapan

feature/qnode

上级 13d44067
...@@ -943,6 +943,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -943,6 +943,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qError("invalid query msg"); qError("invalid query msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->schedulerId = htobe64(msg->schedulerId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
bool queryDone = false; bool queryDone = false;
bool queryRsp = false; bool queryRsp = false;
......
...@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ...@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST}) ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
qworkerTest qworkerTest
PUBLIC os util common transport gtest qcom PUBLIC os util common transport gtest qcom planner qworker
) )
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
......
...@@ -28,40 +28,89 @@ ...@@ -28,40 +28,89 @@
#include "tvariant.h" #include "tvariant.h"
#include "tep.h" #include "tep.h"
#include "trpc.h" #include "trpc.h"
#include "planner.h"
#include "qworker.h"
#include "stub.h" #include "stub.h"
#include "addr_any.h" #include "addr_any.h"
namespace { namespace {
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
return 0;
} }
void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SUseDbRsp *rspMsg = NULL; //todo
return;
}
void stubSetStringToPlan() {
void initTestEnv() {
static Stub stub; static Stub stub;
stub.set(rpcSendRecv, __rpcSendRecv); stub.set(qStringToSubplan, qwtStringToPlan);
{ {
AddrAny any("libtransport.so"); AddrAny any("libplanner.so");
std::map<std::string,void*> result; std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result); any.get_global_func_addr_dynsym("^qStringToSubplan$", result);
for (const auto& f : result) { for (const auto& f : result) {
stub.set(f.second, __rpcSendRecv); stub.set(f.second, qwtStringToPlan);
} }
} }
} }
TEST(testCase, normalCase) {
} }
TEST(testCase, normalCase) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
SRpcMsg queryRpc = {0};
SRpcMsg readyRpc = {0};
SRpcMsg fetchRpc = {0};
SRpcMsg dropRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->schedulerId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
SResReadyMsg readyMsg = {0};
readyMsg.schedulerId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
SResFetchMsg fetchMsg = {0};
fetchMsg.schedulerId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
STaskDropMsg dropMsg = {0};
dropMsg.schedulerId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
stubSetStringToPlan();
code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0);
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
......
...@@ -43,7 +43,7 @@ typedef struct SSchedulerMgmt { ...@@ -43,7 +43,7 @@ typedef struct SSchedulerMgmt {
SHashObj *jobs; // key: queryId, value: SQueryJob* SHashObj *jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt; } SSchedulerMgmt;
typedef struct SQueryLevel { typedef struct SSchLevel {
int32_t level; int32_t level;
int8_t status; int8_t status;
SRWLatch lock; SRWLatch lock;
...@@ -51,12 +51,12 @@ typedef struct SQueryLevel { ...@@ -51,12 +51,12 @@ typedef struct SQueryLevel {
int32_t taskSucceed; int32_t taskSucceed;
int32_t taskNum; int32_t taskNum;
SArray *subTasks; // Element is SQueryTask SArray *subTasks; // Element is SQueryTask
} SQueryLevel; } SSchLevel;
typedef struct SQueryTask { typedef struct SSchTask {
uint64_t taskId; // task id uint64_t taskId; // task id
SQueryLevel *level; // level SSchLevel *level; // level
SSubplan *plan; // subplan SSubplan *plan; // subplan
char *msg; // operator tree char *msg; // operator tree
int32_t msgLen; // msg length int32_t msgLen; // msg length
...@@ -66,19 +66,20 @@ typedef struct SQueryTask { ...@@ -66,19 +66,20 @@ typedef struct SQueryTask {
int32_t childReady; // child task ready number int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
} SQueryTask; } SSchTask;
typedef struct SQueryJobAttr { typedef struct SSchJobAttr {
bool needFetch; bool needFetch;
bool syncQuery; bool syncSchedule;
} SQueryJobAttr; bool queryJob;
} SSchJobAttr;
typedef struct SQueryJob { typedef struct SSchJob {
uint64_t queryId; uint64_t queryId;
int32_t levelNum; int32_t levelNum;
int32_t levelIdx; int32_t levelIdx;
int8_t status; int8_t status;
SQueryJobAttr attr; SSchJobAttr attr;
SQueryProfileSummary summary; SQueryProfileSummary summary;
SEpSet dataSrcEps; SEpSet dataSrcEps;
SEpAddr resEp; SEpAddr resEp;
...@@ -88,7 +89,7 @@ typedef struct SQueryJob { ...@@ -88,7 +89,7 @@ typedef struct SQueryJob {
int32_t userFetch; int32_t userFetch;
int32_t remoteFetch; int32_t remoteFetch;
SQueryTask *fetchTask; SSchTask *fetchTask;
int32_t errCode; int32_t errCode;
void *res; void *res;
int32_t resNumOfRows; int32_t resNumOfRows;
...@@ -99,7 +100,7 @@ typedef struct SQueryJob { ...@@ -99,7 +100,7 @@ typedef struct SQueryJob {
SArray *levels; // Element is SQueryLevel, starting from 0. SArray *levels; // Element is SQueryLevel, starting from 0.
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
} SQueryJob; } SSchJob;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
...@@ -118,7 +119,7 @@ typedef struct SQueryJob { ...@@ -118,7 +119,7 @@ typedef struct SQueryJob {
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task); extern int32_t schLaunchTask(SSchJob *job, SSchTask *task);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -51,12 +51,12 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ ...@@ -51,12 +51,12 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
*/ */
} }
int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
for (int32_t i = 0; i < job->levelNum; ++i) { for (int32_t i = 0; i < job->levelNum; ++i) {
SQueryLevel *level = taosArrayGet(job->levels, i); SSchLevel *level = taosArrayGet(job->levels, i);
for (int32_t m = 0; m < level->taskNum; ++m) { for (int32_t m = 0; m < level->taskNum; ++m) {
SQueryTask *task = taosArrayGet(level->subTasks, m); SSchTask *task = taosArrayGet(level->subTasks, m);
SSubplan *plan = task->plan; SSubplan *plan = task->plan;
int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0; int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
...@@ -71,7 +71,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { ...@@ -71,7 +71,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
for (int32_t n = 0; n < childNum; ++n) { for (int32_t n = 0; n < childNum; ++n) {
SSubplan **child = taosArrayGet(plan->pChildern, n); SSubplan **child = taosArrayGet(plan->pChildern, n);
SQueryTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) { if (NULL == childTask || NULL == *childTask) {
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
...@@ -93,7 +93,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { ...@@ -93,7 +93,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
for (int32_t n = 0; n < parentNum; ++n) { for (int32_t n = 0; n < parentNum; ++n) {
SSubplan **parent = taosArrayGet(plan->pParents, n); SSubplan **parent = taosArrayGet(plan->pParents, n);
SQueryTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES); SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) { if (NULL == parentTask || NULL == *parentTask) {
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
...@@ -107,13 +107,13 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { ...@@ -107,13 +107,13 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
} }
} }
SQueryLevel *level = taosArrayGet(job->levels, 0); SSchLevel *level = taosArrayGet(job->levels, 0);
if (level->taskNum > 1) { if (job->attr.queryJob && level->taskNum > 1) {
qError("invalid plan info, level 0, taskNum:%d", level->taskNum); qError("invalid plan info, level 0, taskNum:%d", level->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
SQueryTask *task = taosArrayGet(level->subTasks, 0); SSchTask *task = taosArrayGet(level->subTasks, 0);
if (task->parents && taosArrayGetSize(task->parents) > 0) { if (task->parents && taosArrayGetSize(task->parents) > 0) {
qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents)); qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
...@@ -124,7 +124,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { ...@@ -124,7 +124,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
} }
int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
int32_t code = 0; int32_t code = 0;
job->queryId = dag->queryId; job->queryId = dag->queryId;
...@@ -146,7 +146,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -146,7 +146,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); job->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
if (NULL == job->levels) { if (NULL == job->levels) {
qError("taosArrayInit %d failed", levelNum); qError("taosArrayInit %d failed", levelNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -159,10 +159,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -159,10 +159,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
job->subPlans = dag->pSubplans; job->subPlans = dag->pSubplans;
SQueryLevel level = {0}; SSchLevel level = {0};
SArray *levelPlans = NULL; SArray *levelPlans = NULL;
int32_t levelPlanNum = 0; int32_t levelPlanNum = 0;
SQueryLevel *pLevel = NULL; SSchLevel *pLevel = NULL;
level.status = JOB_TASK_STATUS_NOT_START; level.status = JOB_TASK_STATUS_NOT_START;
...@@ -189,7 +189,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -189,7 +189,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
pLevel->taskNum = levelPlanNum; pLevel->taskNum = levelPlanNum;
pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask));
if (NULL == pLevel->subTasks) { if (NULL == pLevel->subTasks) {
qError("taosArrayInit %d failed", levelPlanNum); qError("taosArrayInit %d failed", levelPlanNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -197,11 +197,14 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -197,11 +197,14 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
for (int32_t n = 0; n < levelPlanNum; ++n) { for (int32_t n = 0; n < levelPlanNum; ++n) {
SSubplan *plan = taosArrayGet(levelPlans, n); SSubplan *plan = taosArrayGet(levelPlans, n);
SQueryTask task = {0}; SSchTask task = {0};
if (plan->type == QUERY_TYPE_MODIFY) { if (plan->type == QUERY_TYPE_MODIFY) {
job->attr.needFetch = false; job->attr.needFetch = false;
} else {
job->attr.queryJob = true;
} }
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task.plan = plan; task.plan = plan;
...@@ -242,7 +245,7 @@ _return: ...@@ -242,7 +245,7 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) {
if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -269,7 +272,7 @@ int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { ...@@ -269,7 +272,7 @@ int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) {
} }
int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
qError("taosHashPut failed"); qError("taosHashPut failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -278,7 +281,7 @@ int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { ...@@ -278,7 +281,7 @@ int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -294,7 +297,7 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { ...@@ -294,7 +297,7 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId);
} }
...@@ -310,7 +313,7 @@ int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { ...@@ -310,7 +313,7 @@ int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) {
} }
int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
int32_t msgSize = 0; int32_t msgSize = 0;
void *msg = NULL; void *msg = NULL;
...@@ -404,7 +407,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -404,7 +407,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info // TODO if needRetry, set task retry info
...@@ -414,7 +417,7 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod ...@@ -414,7 +417,7 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod
} }
int32_t schFetchFromRemote(SQueryJob *job) { int32_t schFetchFromRemote(SSchJob *job) {
int32_t code = 0; int32_t code = 0;
if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) {
...@@ -433,10 +436,10 @@ _return: ...@@ -433,10 +436,10 @@ _return:
} }
int32_t schProcessOnJobPartialSuccess(SQueryJob *job) { int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
if ((!job->attr.needFetch) && job->attr.syncQuery) { if ((!job->attr.needFetch) && job->attr.syncSchedule) {
tsem_post(&job->rspSem); tsem_post(&job->rspSem);
} }
...@@ -447,27 +450,27 @@ int32_t schProcessOnJobPartialSuccess(SQueryJob *job) { ...@@ -447,27 +450,27 @@ int32_t schProcessOnJobPartialSuccess(SQueryJob *job) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schProcessOnJobFailure(SQueryJob *job, int32_t errCode) { int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
job->status = JOB_TASK_STATUS_FAILED; job->status = JOB_TASK_STATUS_FAILED;
job->errCode = errCode; job->errCode = errCode;
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
if (job->userFetch) { if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) {
tsem_post(&job->rspSem); tsem_post(&job->rspSem);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schProcessOnDataFetched(SQueryJob *job) { int32_t schProcessOnDataFetched(SSchJob *job) {
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
tsem_post(&job->rspSem); tsem_post(&job->rspSem);
} }
int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
bool moved = false; bool moved = false;
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
...@@ -489,7 +492,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { ...@@ -489,7 +492,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
if (SCH_TASK_NEED_WAIT_ALL(task)) { if (SCH_TASK_NEED_WAIT_ALL(task)) {
SCH_LOCK(SCH_WRITE, &task->level->lock); SCH_LOCK(SCH_WRITE, &task->level->lock);
task->level->taskFailed++; task->level->taskSucceed++;
taskDone = task->level->taskSucceed + task->level->taskFailed; taskDone = task->level->taskSucceed + task->level->taskFailed;
SCH_UNLOCK(SCH_WRITE, &task->level->lock); SCH_UNLOCK(SCH_WRITE, &task->level->lock);
...@@ -524,7 +527,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { ...@@ -524,7 +527,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
} }
for (int32_t i = 0; i < parentNum; ++i) { for (int32_t i = 0; i < parentNum; ++i) {
SQueryTask *par = *(SQueryTask **)taosArrayGet(task->parents, i); SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
++par->childReady; ++par->childReady;
...@@ -538,7 +541,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { ...@@ -538,7 +541,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
bool needRetry = false; bool needRetry = false;
bool moved = false; bool moved = false;
int32_t taskDone = 0; int32_t taskDone = 0;
...@@ -575,7 +578,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod ...@@ -575,7 +578,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
switch (msgType) { switch (msgType) {
...@@ -584,7 +587,7 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char ...@@ -584,7 +587,7 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char
if (rsp->code != TSDB_CODE_SUCCESS) { if (rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else { } else {
job->resNumOfRows += rsp->numOfRows; job->resNumOfRows += rsp->affectedRows;
code = schProcessOnTaskSuccess(job, task); code = schProcessOnTaskSuccess(job, task);
if (code) { if (code) {
...@@ -648,7 +651,7 @@ _return: ...@@ -648,7 +651,7 @@ _return:
int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SSubplan *plan = task->plan; SSubplan *plan = task->plan;
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
if (plan->execEpSet.numOfEps <= 0) { if (plan->execEpSet.numOfEps <= 0) {
...@@ -671,10 +674,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { ...@@ -671,10 +674,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schLaunchJob(SQueryJob *job) { int32_t schLaunchJob(SSchJob *job) {
SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx); SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
for (int32_t i = 0; i < level->taskNum; ++i) { for (int32_t i = 0; i < level->taskNum; ++i) {
SQueryTask *task = taosArrayGet(level->subTasks, i); SSchTask *task = taosArrayGet(level->subTasks, i);
SCH_ERR_RET(schLaunchTask(job, task)); SCH_ERR_RET(schLaunchTask(job, task));
} }
...@@ -683,10 +686,10 @@ int32_t schLaunchJob(SQueryJob *job) { ...@@ -683,10 +686,10 @@ int32_t schLaunchJob(SQueryJob *job) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schDropJobAllTasks(SQueryJob *job) { void schDropJobAllTasks(SSchJob *job) {
void *pIter = taosHashIterate(job->succTasks, NULL); void *pIter = taosHashIterate(job->succTasks, NULL);
while (pIter) { while (pIter) {
SQueryTask *task = *(SQueryTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK); schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
...@@ -695,7 +698,7 @@ void schDropJobAllTasks(SQueryJob *job) { ...@@ -695,7 +698,7 @@ void schDropJobAllTasks(SQueryJob *job) {
pIter = taosHashIterate(job->failTasks, NULL); pIter = taosHashIterate(job->failTasks, NULL);
while (pIter) { while (pIter) {
SQueryTask *task = *(SQueryTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK); schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
...@@ -721,7 +724,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { ...@@ -721,7 +724,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
} }
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncQuery) { int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -731,12 +734,12 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, ...@@ -731,12 +734,12 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag,
} }
int32_t code = 0; int32_t code = 0;
SQueryJob *job = calloc(1, sizeof(SQueryJob)); SSchJob *job = calloc(1, sizeof(SSchJob));
if (NULL == job) { if (NULL == job) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
job->attr.syncQuery = syncQuery; job->attr.syncSchedule = syncSchedule;
job->transport = transport; job->transport = transport;
job->qnodeList = qnodeList; job->qnodeList = qnodeList;
...@@ -777,9 +780,9 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, ...@@ -777,9 +780,9 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag,
SCH_ERR_JRET(schLaunchJob(job)); SCH_ERR_JRET(schLaunchJob(job));
*(SQueryJob **)pJob = job; *(SSchJob **)pJob = job;
if (syncQuery) { if (syncSchedule) {
tsem_wait(&job->rspSem); tsem_wait(&job->rspSem);
} }
...@@ -787,7 +790,7 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, ...@@ -787,7 +790,7 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag,
_return: _return:
*(SQueryJob **)pJob = NULL; *(SSchJob **)pJob = NULL;
scheduleFreeJob(job); scheduleFreeJob(job);
SCH_RET(code); SCH_RET(code);
...@@ -798,7 +801,7 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi ...@@ -798,7 +801,7 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true)); SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));
SQueryJob *job = *(SQueryJob **)pJob; SSchJob *job = *(SSchJob **)pJob;
*numOfRows = job->resNumOfRows; *numOfRows = job->resNumOfRows;
...@@ -815,7 +818,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { ...@@ -815,7 +818,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SQueryJob *job = pJob; SSchJob *job = pJob;
int32_t code = 0; int32_t code = 0;
if (!job->attr.needFetch) { if (!job->attr.needFetch) {
...@@ -874,7 +877,7 @@ void scheduleFreeJob(void *pJob) { ...@@ -874,7 +877,7 @@ void scheduleFreeJob(void *pJob) {
return; return;
} }
SQueryJob *job = pJob; SSchJob *job = pJob;
if (job->status > 0) { if (job->status > 0) {
if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) { if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) {
......
...@@ -36,9 +36,9 @@ ...@@ -36,9 +36,9 @@
namespace { namespace {
extern "C" int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); extern "C" int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
void schtBuildDag(SQueryDag *dag) { void schtBuildQueryDag(SQueryDag *dag) {
uint64_t qId = 0x0000000000000001; uint64_t qId = 0x0000000000000001;
dag->queryId = qId; dag->queryId = qId;
...@@ -82,6 +82,50 @@ void schtBuildDag(SQueryDag *dag) { ...@@ -82,6 +82,50 @@ void schtBuildDag(SQueryDag *dag) {
taosArrayPush(dag->pSubplans, &scan); taosArrayPush(dag->pSubplans, &scan);
} }
void schtBuildInsertDag(SQueryDag *dag) {
uint64_t qId = 0x0000000000000002;
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan));
SSubplan insertPlan[2] = {0};
insertPlan[0].id.queryId = qId;
insertPlan[0].id.templateId = 0x0000000000000003;
insertPlan[0].id.subplanId = 0x0000000000000004;
insertPlan[0].type = QUERY_TYPE_MODIFY;
insertPlan[0].level = 0;
insertPlan[0].execEpSet.numOfEps = 1;
insertPlan[0].execEpSet.port[0] = 6030;
strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0");
insertPlan[0].pChildern = NULL;
insertPlan[0].pParents = NULL;
insertPlan[0].pNode = NULL;
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
insertPlan[1].id.queryId = qId;
insertPlan[1].id.templateId = 0x0000000000000003;
insertPlan[1].id.subplanId = 0x0000000000000005;
insertPlan[1].type = QUERY_TYPE_MODIFY;
insertPlan[1].level = 0;
insertPlan[1].execEpSet.numOfEps = 1;
insertPlan[1].execEpSet.port[0] = 6030;
strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1");
insertPlan[1].pChildern = NULL;
insertPlan[1].pParents = NULL;
insertPlan[1].pNode = NULL;
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
taosArrayPush(inserta, &insertPlan[0]);
taosArrayPush(inserta, &insertPlan[1]);
taosArrayPush(dag->pSubplans, &inserta);
}
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
*str = (char *)calloc(1, 20); *str = (char *)calloc(1, 20);
*len = 20; *len = 20;
...@@ -119,6 +163,35 @@ void schtSetExecNode() { ...@@ -119,6 +163,35 @@ void schtSetExecNode() {
} }
} }
void *schtSendRsp(void *param) {
SSchJob *job = NULL;
int32_t code = 0;
while (true) {
job = *(SSchJob **)param;
if (job) {
break;
}
usleep(1000);
}
void *pIter = taosHashIterate(job->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SShellSubmitRspMsg rsp = {0};
rsp.affectedRows = 10;
schHandleRspMsg(job, task, TSDB_MSG_TYPE_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
pIter = taosHashIterate(job->execTasks, pIter);
}
return NULL;
}
void *pInsertJob = NULL;
} }
...@@ -140,7 +213,7 @@ TEST(queryTest, normalCase) { ...@@ -140,7 +213,7 @@ TEST(queryTest, normalCase) {
int32_t code = schedulerInit(NULL); int32_t code = schedulerInit(NULL);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
schtBuildDag(&dag); schtBuildQueryDag(&dag);
schtSetPlanToString(); schtSetPlanToString();
schtSetExecNode(); schtSetExecNode();
...@@ -148,10 +221,10 @@ TEST(queryTest, normalCase) { ...@@ -148,10 +221,10 @@ TEST(queryTest, normalCase) {
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SQueryJob *job = (SQueryJob *)pJob; SSchJob *job = (SSchJob *)pJob;
void *pIter = taosHashIterate(job->execTasks, NULL); void *pIter = taosHashIterate(job->execTasks, NULL);
while (pIter) { while (pIter) {
SQueryTask *task = *(SQueryTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0}; SQueryTableRsp rsp = {0};
code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_QUERY, (char *)&rsp, sizeof(rsp), 0); code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_QUERY, (char *)&rsp, sizeof(rsp), 0);
...@@ -162,7 +235,7 @@ TEST(queryTest, normalCase) { ...@@ -162,7 +235,7 @@ TEST(queryTest, normalCase) {
pIter = taosHashIterate(job->execTasks, NULL); pIter = taosHashIterate(job->execTasks, NULL);
while (pIter) { while (pIter) {
SQueryTask *task = *(SQueryTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0}; SResReadyRsp rsp = {0};
code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_RES_READY, (char *)&rsp, sizeof(rsp), 0); code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_RES_READY, (char *)&rsp, sizeof(rsp), 0);
...@@ -173,7 +246,7 @@ TEST(queryTest, normalCase) { ...@@ -173,7 +246,7 @@ TEST(queryTest, normalCase) {
pIter = taosHashIterate(job->execTasks, NULL); pIter = taosHashIterate(job->execTasks, NULL);
while (pIter) { while (pIter) {
SQueryTask *task = *(SQueryTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0}; SQueryTableRsp rsp = {0};
code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_QUERY, (char *)&rsp, sizeof(rsp), 0); code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_QUERY, (char *)&rsp, sizeof(rsp), 0);
...@@ -184,7 +257,7 @@ TEST(queryTest, normalCase) { ...@@ -184,7 +257,7 @@ TEST(queryTest, normalCase) {
pIter = taosHashIterate(job->execTasks, NULL); pIter = taosHashIterate(job->execTasks, NULL);
while (pIter) { while (pIter) {
SQueryTask *task = *(SQueryTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0}; SResReadyRsp rsp = {0};
code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_RES_READY, (char *)&rsp, sizeof(rsp), 0); code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_RES_READY, (char *)&rsp, sizeof(rsp), 0);
...@@ -219,6 +292,46 @@ TEST(queryTest, normalCase) { ...@@ -219,6 +292,46 @@ TEST(queryTest, normalCase) {
} }
TEST(insertTest, normalCase) {
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
SQueryDag dag = {0};
uint64_t numOfRows = 0;
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
SEpAddr qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit(NULL);
ASSERT_EQ(code, 0);
schtBuildInsertDag(&dag);
schtSetPlanToString();
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t thread1;
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &numOfRows);
ASSERT_EQ(code, 0);
ASSERT_EQ(numOfRows, 20);
scheduleFreeJob(pInsertJob);
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册