From e95647970e89473952970c1217dc00d7007f77bf Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Mon, 23 May 2022 22:35:45 +0800 Subject: [PATCH] scheduler async api --- include/libs/scheduler/scheduler.h | 3 ++ source/libs/scheduler/inc/schedulerInt.h | 20 ++++++---- source/libs/scheduler/src/schJob.c | 48 ++++++++++++++++++----- source/libs/scheduler/src/schRemote.c | 42 ++++++++++---------- source/libs/scheduler/src/scheduler.c | 50 ++++++------------------ 5 files changed, 88 insertions(+), 75 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index dcd058a293..62a97d1be0 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -64,6 +64,9 @@ typedef struct STaskInfo { SSubQueryMsg *msg; } STaskInfo; +typedef void (*schedulerCallback)(SQueryResult* pResult, void* param, int32_t code); + + int32_t schedulerInit(SSchedulerCfg *cfg); /** diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index ffac0f856d..d2ddd8b695 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -40,8 +40,8 @@ enum { }; typedef struct SSchTrans { - void *transInst; - void *transHandle; + void *pTrans; + void *pHandle; } SSchTrans; typedef struct SSchHbTrans { @@ -74,12 +74,17 @@ typedef struct SSchJobStat { } SSchJobStat; -typedef struct SSchedulerStat { +typedef struct SSchStat { SSchApiStat api; SSchRuntimeStat runtime; SSchJobStat job; -} SSchedulerStat; +} SSchStat; +typedef struct SSchResInfo { + SQueryResult queryRes; + schedulerCallback userFp; + void* userParam; +} SSchResInfo; typedef struct SSchedulerMgmt { uint64_t taskId; // sequential taksId @@ -89,7 +94,7 @@ typedef struct SSchedulerMgmt { bool exit; int32_t jobRef; int32_t jobNum; - SSchedulerStat stat; + SSchStat stat; SHashObj *hbConnections; } SSchedulerMgmt; @@ -170,7 +175,7 @@ typedef struct SSchJob { SSchJobAttr attr; int32_t levelNum; int32_t taskNum; - void *transport; + void *pTrans; SArray *nodeList; // qnode/vnode list, SArray SArray *levels; // starting from 0. SArray SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler @@ -196,6 +201,7 @@ typedef struct SSchJob { void *queryRes; void *resData; //TODO free it or not int32_t resNumOfRows; + SSchResInfo userRes; const char *sql; SQueryProfileSummary summary; } SSchJob; @@ -292,7 +298,7 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo void schFreeRpcCtxVal(const void *arg); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle); -int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, +int32_t schExecStaticExplainJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, bool syncSchedule); int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, int64_t startTs, bool sync); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 14f4646397..a211d90a37 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -39,8 +39,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * return TSDB_CODE_SUCCESS; } -int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, - int64_t startTs, bool syncSchedule) { +int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pNodeList, const char *sql, + SSchResInfo *pRes, int64_t startTs, bool syncSchedule) { int32_t code = 0; int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); @@ -51,8 +51,9 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray pJob->attr.explainMode = pDag->explainInfo.mode; pJob->attr.syncSchedule = syncSchedule; - pJob->transport = transport; + pJob->pTrans = pTrans; pJob->sql = sql; + pJob->userRes = pRes; if (pNodeList != NULL) { pJob->nodeList = taosArrayDup(pNodeList); @@ -1228,8 +1229,8 @@ void schFreeJobImpl(void *job) { schCloseJobRef(); } -int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - int64_t startTs, bool sync) { +int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, int64_t startTs, bool sync) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { @@ -1238,7 +1239,7 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int int32_t code = 0; SSchJob *pJob = NULL; - SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, sync)); + SCH_ERR_JRET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); SCH_ERR_JRET(schLaunchJob(pJob)); @@ -1261,8 +1262,36 @@ _return: SCH_RET(code); } -int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - bool syncSchedule) { +int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes, bool sync) { + int32_t code = 0; + + *pJob = 0; + + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { + SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, sync)); + } else { + SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, sync)); + } + +_return: + + if (*pJob) { + SSchJob *job = schAcquireJob(*pJob); + + pRes->code = atomic_load_32(&job->errCode); + pRes->numOfRows = job->resNumOfRows; + pRes->res = job->queryRes; + job->queryRes = NULL; + + schReleaseJob(*pJob); + } + + return code; +} + +int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, bool sync) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); int32_t code = 0; @@ -1277,7 +1306,8 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa pJob->attr.explainMode = pDag->explainInfo.mode; pJob->queryId = pDag->queryId; pJob->subPlans = pDag->pSubplans; - + pJob->userRes = pRes; + SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); int64_t refId = taosAddRef(schMgmt.jobRef, pJob); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 6d9f6b435f..247358af8f 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -481,7 +481,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType, param->queryId = pJob->queryId; param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); - param->transport = pJob->transport; + param->transport = pJob->pTrans; msgSendInfo->param = param; msgSendInfo->fp = fp; @@ -556,7 +556,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { param->nodeEpId.nodeId = addr->nodeId; memcpy(¶m->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); - param->transport = pJob->transport; + param->transport = pJob->pTrans; *pParam = param; @@ -638,7 +638,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp)); param->nodeEpId = epId; - param->transport = pJob->transport; + param->transport = pJob->pTrans; pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; @@ -666,7 +666,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId * int32_t code = 0; SSchHbTrans hb = {0}; - hb.trans.transInst = pJob->transport; + hb.trans.pTrans = pJob->pTrans; SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx)); @@ -743,12 +743,12 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { __async_send_cb_fn_t fp = NULL; SCH_ERR_JRET(schGetCallbackFp(msgType, &fp)); - param->transport = trans.transInst; + param->transport = trans.pTrans; pMsgSendInfo->param = param; pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; - pMsgSendInfo->msgInfo.handle = trans.transHandle; + pMsgSendInfo->msgInfo.handle = trans.pHandle; pMsgSendInfo->msgType = msgType; pMsgSendInfo->fp = fp; @@ -756,13 +756,13 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { SEpSet epSet = {.inUse = 0, .numOfEps = 1}; memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep)); - qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle, + qDebug("start to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d", trans.pTrans, trans.pHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port); - code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); + code = asyncSendMsgToServerExt(trans.pTrans, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); if (code) { - qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst, - trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code)); + qError("fail to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d, error:%x - %s", trans.pTrans, + trans.pHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code)); SCH_ERR_JRET(code); } @@ -812,8 +812,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { memcpy(&hb->trans, trans, sizeof(*trans)); SCH_UNLOCK(SCH_WRITE, &hb->lock); - qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId, - epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle); + qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId, + epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle); return TSDB_CODE_SUCCESS; } @@ -833,8 +833,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { } SSchTrans trans = {0}; - trans.transInst = pParam->transport; - trans.transHandle = pMsg->handle; + trans.pTrans = pParam->transport; + trans.pHandle = pMsg->handle; SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans)); @@ -879,7 +879,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { param->queryId = pJob->queryId; param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); - param->transport = pJob->transport; + param->transport = pJob->pTrans; *pParam = param; @@ -1034,15 +1034,15 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; - pMsgSendInfo->msgInfo.handle = trans->transHandle; + pMsgSendInfo->msgInfo.handle = trans->pHandle; pMsgSendInfo->msgType = msgType; - qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType), + qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "pTrans:%p, pHandle:%p", TMSG_INFO(msgType), ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId, - trans->transInst, trans->transHandle); + trans->pTrans, trans->pHandle); int64_t transporterId = 0; - code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); + code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); if (code) { SCH_ERR_JRET(code); } @@ -1208,12 +1208,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); - SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)}; + SSchTrans trans = {.transInst = pJob->pTrans, .transHandle = SCH_GET_TASK_HANDLE(pTask)}; SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); if (msgType == TDMT_VND_QUERY) { - SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle)); + SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.pHandle)); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index bd2c7e5b49..de802465c2 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -67,50 +67,24 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { return TSDB_CODE_SUCCESS; } -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, +int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes) { - if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { + if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t code = 0; - - *pJob = 0; - - if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { - SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); - } else { - SCH_ERR_JRET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); - } - -_return: - - if (*pJob) { - SSchJob *job = schAcquireJob(*pJob); - - pRes->code = atomic_load_32(&job->errCode); - pRes->numOfRows = job->resNumOfRows; - pRes->res = job->queryRes; - job->queryRes = NULL; - - schReleaseJob(*pJob); - } - - return code; + SSchResInfo resInfo = {.queryRes = *pRes}; + SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, true)); } -int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) { - if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { - SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false)); - } else { - SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false)); - } - - return TSDB_CODE_SUCCESS; +int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SQueryResult *pRes, schedulerCallback fp, void* param) { + if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes || NULL == fp || NULL == param) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SSchResInfo resInfo = {.queryRes = *pRes, .userFp = fp, .userParam = param}; + SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, false)); } int32_t schedulerFetchRows(int64_t job, void **pData) { -- GitLab