提交 3b2a5005 编写于 作者: D dapan1121

feature/scheduler

上级 ed477de8
......@@ -697,8 +697,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
void *dropConnection = NULL;
void *cancelConnection = NULL;
SQWConnInfo *dropConnection = NULL;
SQWConnInfo *cancelConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
......@@ -793,12 +793,12 @@ _return:
if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code));
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
}
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code));
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
}
QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
......@@ -811,7 +811,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
SQWConnInfo connInfo = {0};
void *readyConnection = NULL;
SQWConnInfo *readyConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
......@@ -879,7 +879,7 @@ _return:
if (TSDB_CODE_SUCCESS == code && readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
}
if (code) {
......@@ -907,7 +907,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->taskType, taskType);
atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
......@@ -926,7 +928,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
}
QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
queryRsped = true;
......@@ -944,7 +946,7 @@ _return:
if (!queryRsped) {
qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
QW_RET(TSDB_CODE_SUCCESS);
......@@ -1007,7 +1009,7 @@ _return:
if (needRsp) {
qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
QW_RET(TSDB_CODE_SUCCESS);
......@@ -1050,7 +1052,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} else {
atomic_store_8(&ctx->queryContinue, 1);
}
......@@ -1067,7 +1069,7 @@ _return:
qwFreeFetchRsp(rsp);
rsp = NULL;
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code));
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
}
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -1147,7 +1149,7 @@ _return:
if (code || rsp) {
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
}
QW_RET(TSDB_CODE_SUCCESS);
......@@ -1210,8 +1212,7 @@ _return:
if (TSDB_CODE_SUCCESS != code || needRsp) {
qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop msg rsped, code:%x", code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
}
QW_RET(TSDB_CODE_SUCCESS);
......@@ -1247,6 +1248,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return:
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_RET(TSDB_CODE_SUCCESS);
}
......@@ -1297,8 +1299,8 @@ _return:
QW_UNLOCK(QW_READ, &mgmt->schLock);
for (int32_t j = 0; j < i; ++j) {
QW_DLOG("hb on connection handle %p, taskNum:%d", rspList[j].connInfo.handle, (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
tFreeSSchedulerHbRsp(&rspList[j].rsp);
}
......
......@@ -324,7 +324,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg.connInfo.ahandle = pMsg->ahandle;
char* sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, sql:%s", node, sql);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
tfree(sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
......@@ -357,7 +357,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processCQuery start, node:%p", node);
QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));
......@@ -391,7 +391,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processReady start, node:%p", node);
QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg));
......@@ -453,7 +453,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processFetch start, node:%p", node);
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
......@@ -472,6 +472,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
......@@ -484,6 +485,11 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
......@@ -493,6 +499,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
_return:
QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
return TSDB_CODE_SUCCESS;
}
......@@ -525,7 +532,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processDrop start, node:%p", node);
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
......@@ -559,7 +566,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_DLOG("processHb start, node:%p", node);
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));
......
......@@ -137,7 +137,18 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
return TSDB_CODE_SUCCESS;
case TDMT_VND_RES_READY_RSP:
reqMsgType = TDMT_VND_QUERY;
break;
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
case TDMT_VND_CREATE_TABLE_RSP:
case TDMT_VND_SUBMIT_RSP:
case TDMT_VND_FETCH_RSP:
......@@ -1085,7 +1096,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
}
pTask = *task;
SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode));
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
pTask->handle = pMsg->handle;
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
......@@ -1174,6 +1185,8 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
qDebug("handle %p is broken", pMsg->handle);
if (head->isHbParam) {
SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
SSchTrans trans = {.transInst = hbParam->transport, .transHandle = NULL};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册