提交 5a202e89 编写于 作者: D dapan1121

feature/scheduler

上级 195ca4ab
......@@ -123,7 +123,8 @@ typedef struct SQWTaskCtx {
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
SRWLatch connLock;
SQWConnInfo connInfo;
SQWConnInfo hbConnInfo;
SQueryNodeEpId epId;
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus;
......
......@@ -580,6 +580,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
int32_t taskNum = 0;
hbInfo->connInfo = sch->hbConnInfo;
hbInfo->rsp.epId = sch->epId;
QW_LOCK(QW_READ, &sch->tasksLock);
taskNum = taosHashGetSize(sch->tasksHash);
......@@ -591,8 +594,6 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
hbInfo->connInfo = sch->connInfo;
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0;
......@@ -1229,9 +1230,12 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_LOCK(QW_WRITE, &sch->connLock);
origHandle = sch->connInfo.handle;
if (sch->hbConnInfo.handle) {
rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
}
memcpy(&sch->connInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
memcpy(&sch->epId, &req->epId, sizeof(req->epId));
QW_UNLOCK(QW_WRITE, &sch->connLock);
......
......@@ -672,15 +672,14 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(code);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SCH_LOCK(SCH_WRITE, &hb->lock);
memcpy(&hb->trans, trans, sizeof(*trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
qDebug("hb connection updated, sId:%" PRIx64
", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p",
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p",
schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst,
trans->transHandle);
......@@ -1564,6 +1563,9 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet*
pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp;
qDebug("start to send %s msg, refId:%" PRIx64 "instance:%p, handle:%p",
TMSG_INFO(msgType), pJob->refId, trans->transInst, trans->transHandle);
int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
if (code) {
......@@ -1647,12 +1649,16 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port);
code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx);
if (code) {
qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s",
trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code));
SCH_ERR_JRET(code);
}
qDebug("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType));
qDebug("hb msg sent");
return TSDB_CODE_SUCCESS;
_return:
......@@ -1833,6 +1839,7 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
epId.nodeId = addr->nodeId;
memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
#if 0
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
bool exist = false;
......@@ -1841,6 +1848,7 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schBuildAndSendHbMsg(&epId));
}
}
#endif
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册