提交 876942b4 编写于 作者: D dapan1121

feature/qnode

上级 6677dfba
......@@ -36,11 +36,31 @@ enum {
SCH_WRITE,
};
typedef struct SSchApiStat {
} SSchApiStat;
typedef struct SSchRuntimeStat {
} SSchRuntimeStat;
typedef struct SSchJobStat {
} SSchJobStat;
typedef struct SSchedulerStat {
SSchApiStat api;
SSchRuntimeStat runtime;
SSchJobStat job;
} SSchedulerStat;
typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob*
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob*
SSchedulerStat stat;
} SSchedulerMgmt;
typedef struct SSchCallbackParam {
......
......@@ -1462,34 +1462,37 @@ void scheduleFreeJob(void *job) {
}
SSchJob *pJob = job;
uint64_t queryId = pJob->queryId;
if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) {
SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob);
return;
}
if (SCH_GET_JOB_STATUS(pJob) > 0) {
if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) {
SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob);
return;
}
schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING);
schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING);
SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref));
SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref));
while (true) {
int32_t ref = atomic_load_32(&pJob->ref);
if (0 == ref) {
break;
} else if (ref > 0) {
usleep(1);
} else {
assert(0);
while (true) {
int32_t ref = atomic_load_32(&pJob->ref);
if (0 == ref) {
break;
} else if (ref > 0) {
usleep(1);
} else {
assert(0);
}
}
}
SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob));
SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob));
if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob);
}
if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob);
}
schDropJobAllTasks(pJob);
schDropJobAllTasks(pJob);
}
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
......@@ -1515,6 +1518,8 @@ void scheduleFreeJob(void *job) {
tfree(pJob->res);
tfree(pJob);
qDebug("QID:%"PRIx64" job freed", queryId);
}
void schedulerDestroy(void) {
......
......@@ -79,6 +79,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan->level = 1;
scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
scanPlan->msgType = TDMT_VND_QUERY;
mergePlan->id.queryId = qId;
mergePlan->id.templateId = 0x4444444444;
......@@ -89,6 +90,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
mergePlan->msgType = TDMT_VND_QUERY;
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
......@@ -163,6 +165,11 @@ void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
}
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
}
void schtSetPlanToString() {
static Stub stub;
......@@ -190,6 +197,20 @@ void schtSetExecNode() {
}
}
void schtSetRpcSendRequest() {
static Stub stub;
stub.set(rpcSendRequest, schtRpcSendRequest);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
for (const auto& f : result) {
stub.set(f.second, schtRpcSendRequest);
}
}
}
void *schtSendRsp(void *param) {
SSchJob *job = NULL;
int32_t code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册