提交 e9564797 编写于 作者: D dapan1121

scheduler async api

上级 679c9460
......@@ -64,6 +64,9 @@ typedef struct STaskInfo {
SSubQueryMsg *msg;
} STaskInfo;
typedef void (*schedulerCallback)(SQueryResult* pResult, void* param, int32_t code);
int32_t schedulerInit(SSchedulerCfg *cfg);
/**
......
......@@ -40,8 +40,8 @@ enum {
};
typedef struct SSchTrans {
void *transInst;
void *transHandle;
void *pTrans;
void *pHandle;
} SSchTrans;
typedef struct SSchHbTrans {
......@@ -74,12 +74,17 @@ typedef struct SSchJobStat {
} SSchJobStat;
typedef struct SSchedulerStat {
typedef struct SSchStat {
SSchApiStat api;
SSchRuntimeStat runtime;
SSchJobStat job;
} SSchedulerStat;
} SSchStat;
typedef struct SSchResInfo {
SQueryResult queryRes;
schedulerCallback userFp;
void* userParam;
} SSchResInfo;
typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId
......@@ -89,7 +94,7 @@ typedef struct SSchedulerMgmt {
bool exit;
int32_t jobRef;
int32_t jobNum;
SSchedulerStat stat;
SSchStat stat;
SHashObj *hbConnections;
} SSchedulerMgmt;
......@@ -170,7 +175,7 @@ typedef struct SSchJob {
SSchJobAttr attr;
int32_t levelNum;
int32_t taskNum;
void *transport;
void *pTrans;
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeAddr>
SArray *levels; // starting from 0. SArray<SSchLevel>
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
......@@ -196,6 +201,7 @@ typedef struct SSchJob {
void *queryRes;
void *resData; //TODO free it or not
int32_t resNumOfRows;
SSchResInfo userRes;
const char *sql;
SQueryProfileSummary summary;
} SSchJob;
......@@ -292,7 +298,7 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle);
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
int32_t schExecStaticExplainJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
bool syncSchedule);
int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
int64_t startTs, bool sync);
......
......@@ -39,8 +39,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
return TSDB_CODE_SUCCESS;
}
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
int64_t startTs, bool syncSchedule) {
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pNodeList, const char *sql,
SSchResInfo *pRes, int64_t startTs, bool syncSchedule) {
int32_t code = 0;
int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
......@@ -51,8 +51,9 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport;
pJob->pTrans = pTrans;
pJob->sql = sql;
pJob->userRes = pRes;
if (pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pNodeList);
......@@ -1228,8 +1229,8 @@ void schFreeJobImpl(void *job) {
schCloseJobRef();
}
int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
int64_t startTs, bool sync) {
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, int64_t startTs, bool sync) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
......@@ -1238,7 +1239,7 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, sync));
SCH_ERR_JRET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync));
SCH_ERR_JRET(schLaunchJob(pJob));
......@@ -1261,8 +1262,36 @@ _return:
SCH_RET(code);
}
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
bool syncSchedule) {
int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SSchResInfo *pRes, bool sync) {
int32_t code = 0;
*pJob = 0;
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, sync));
} else {
SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, sync));
}
_return:
if (*pJob) {
SSchJob *job = schAcquireJob(*pJob);
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
pRes->res = job->queryRes;
job->queryRes = NULL;
schReleaseJob(*pJob);
}
return code;
}
int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, bool sync) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
int32_t code = 0;
......@@ -1277,7 +1306,8 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->queryId = pDag->queryId;
pJob->subPlans = pDag->pSubplans;
pJob->userRes = pRes;
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
......
......@@ -481,7 +481,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType,
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->transport = pJob->transport;
param->transport = pJob->pTrans;
msgSendInfo->param = param;
msgSendInfo->fp = fp;
......@@ -556,7 +556,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
param->nodeEpId.nodeId = addr->nodeId;
memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
param->transport = pJob->transport;
param->transport = pJob->pTrans;
*pParam = param;
......@@ -638,7 +638,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));
param->nodeEpId = epId;
param->transport = pJob->transport;
param->transport = pJob->pTrans;
pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp;
......@@ -666,7 +666,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *
int32_t code = 0;
SSchHbTrans hb = {0};
hb.trans.transInst = pJob->transport;
hb.trans.pTrans = pJob->pTrans;
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
......@@ -743,12 +743,12 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
param->transport = trans.transInst;
param->transport = trans.pTrans;
pMsgSendInfo->param = param;
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgInfo.handle = trans.transHandle;
pMsgSendInfo->msgInfo.handle = trans.pHandle;
pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp;
......@@ -756,13 +756,13 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle,
qDebug("start to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d", trans.pTrans, trans.pHandle,
nodeEpId->ep.fqdn, nodeEpId->ep.port);
code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
code = asyncSendMsgToServerExt(trans.pTrans, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
if (code) {
qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst,
trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
qError("fail to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d, error:%x - %s", trans.pTrans,
trans.pHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
SCH_ERR_JRET(code);
}
......@@ -812,8 +812,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
memcpy(&hb->trans, trans, sizeof(*trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId,
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle);
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
return TSDB_CODE_SUCCESS;
}
......@@ -833,8 +833,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
}
SSchTrans trans = {0};
trans.transInst = pParam->transport;
trans.transHandle = pMsg->handle;
trans.pTrans = pParam->transport;
trans.pHandle = pMsg->handle;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
......@@ -879,7 +879,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->transport = pJob->transport;
param->transport = pJob->pTrans;
*pParam = param;
......@@ -1034,15 +1034,15 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgInfo.handle = trans->transHandle;
pMsgSendInfo->msgInfo.handle = trans->pHandle;
pMsgSendInfo->msgType = msgType;
qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "pTrans:%p, pHandle:%p", TMSG_INFO(msgType),
ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
trans->transInst, trans->transHandle);
trans->pTrans, trans->pHandle);
int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
if (code) {
SCH_ERR_JRET(code);
}
......@@ -1208,12 +1208,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
SSchTrans trans = {.transInst = pJob->pTrans, .transHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle,
(rpcCtx.args ? &rpcCtx : NULL)));
if (msgType == TDMT_VND_QUERY) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle));
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.pHandle));
}
return TSDB_CODE_SUCCESS;
......
......@@ -67,50 +67,24 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
*pJob = 0;
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
} else {
SCH_ERR_JRET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
}
_return:
if (*pJob) {
SSchJob *job = schAcquireJob(*pJob);
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
pRes->res = job->queryRes;
job->queryRes = NULL;
schReleaseJob(*pJob);
}
return code;
SSchResInfo resInfo = {.queryRes = *pRes};
SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, true));
}
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
} else {
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
}
return TSDB_CODE_SUCCESS;
int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, SQueryResult *pRes, schedulerCallback fp, void* param) {
if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes || NULL == fp || NULL == param) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchResInfo resInfo = {.queryRes = *pRes, .userFp = fp, .userParam = param};
SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo, false));
}
int32_t schedulerFetchRows(int64_t job, void **pData) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册