提交 f51203bb 编写于 作者: D dapan1121

feature/scheduler

上级 ae6224d9
......@@ -146,6 +146,8 @@ int32_t cleanupTaskQueue();
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, SRpcCtx *ctx);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
......
......@@ -140,7 +140,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
return 0;
}
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, SRpcCtx *ctx) {
char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) {
qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
......@@ -154,14 +154,19 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
.contLen = pInfo->msgInfo.len,
.ahandle = (void*)pInfo,
.handle = pInfo->msgInfo.handle,
.persistHandle = persistHandle,
.code = 0};
assert(pInfo->fp != NULL);
rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, ctx);
return TSDB_CODE_SUCCESS;
}
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
}
char *jobTaskStatusStr(int32_t status) {
switch (status) {
case JOB_TASK_STATUS_NULL:
......
......@@ -70,11 +70,16 @@ typedef struct SQWDebug {
bool statusEnable;
} SQWDebug;
typedef struct SQWConnInfo {
void *handle;
void *ahandle;
} SQWConnInfo;
typedef struct SQWMsg {
void *node;
char *msg;
int32_t msgLen;
void *connection;
SQWConnInfo connInfo;
} SQWMsg;
typedef struct SQWHbInfo {
......@@ -101,10 +106,6 @@ typedef struct SQWTaskCtx {
int8_t phase;
int8_t taskType;
void *readyConnection;
void *dropConnection;
void *cancelConnection;
bool emptyRes;
bool queryFetched;
bool queryEnd;
......@@ -112,6 +113,7 @@ typedef struct SQWTaskCtx {
bool queryInQueue;
int32_t rspCode;
SQWConnInfo connInfo;
int8_t events[QW_EVENT_MAX];
qTaskInfo_t taskHandle;
......@@ -121,7 +123,7 @@ typedef struct SQWTaskCtx {
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
uint64_t hbSeqId;
void *hbConnection;
SQWConnInfo *hbConnection;
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus;
......
......@@ -41,6 +41,7 @@ void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
......
......@@ -402,6 +402,9 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
rpcReleaseHandle(ctx->connInfo.handle, CONN_SERVER);
ctx->connInfo.handle = NULL;
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
if (ctx->sinkHandle) {
......@@ -729,7 +732,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = ctx->dropConnection;
dropConnection = &ctx->connInfo;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
break;
}
......@@ -763,7 +766,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = ctx->dropConnection;
dropConnection = &ctx->connInfo;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
......@@ -807,9 +810,8 @@ _return:
int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
SQWConnInfo connInfo = {0};
void *readyConnection = NULL;
void *dropConnection = NULL;
void *cancelConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
......@@ -827,10 +829,17 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
ctx->emptyRes = true;
}
#if 0
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
readyConnection = ctx->readyConnection;
readyConnection = &ctx->connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
}
#else
connInfo.handle = ctx->connInfo.handle;
readyConnection = &connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
......@@ -841,7 +850,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = ctx->dropConnection;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
......@@ -869,21 +877,11 @@ _return:
qwReleaseTaskCtx(mgmt, ctx);
}
if (readyConnection) {
if (TSDB_CODE_SUCCESS == code && readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
}
if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code));
}
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code));
}
if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
}
......@@ -893,17 +891,17 @@ _return:
QW_RET(code);
}
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
int32_t code = 0;
bool queryRsped = false;
bool needStop = false;
struct SSubplan *plan = NULL;
SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwRegisterBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
......@@ -927,7 +925,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code));
QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
queryRsped = true;
......@@ -945,7 +943,7 @@ _return:
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (!queryRsped) {
qwBuildAndSendQueryRsp(qwMsg->connection, code);
qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
}
......@@ -968,8 +966,9 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (ctx->phase == QW_PHASE_PRE_QUERY) {
ctx->connInfo.handle == qwMsg->connInfo.handle;
ctx->connInfo.ahandle = qwMsg->connInfo.ahandle;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
ctx->readyConnection = qwMsg->connection;
needRsp = false;
QW_TASK_DLOG_E("ready msg will not rsp now");
goto _return;
......@@ -1007,7 +1006,7 @@ _return:
}
if (needRsp) {
qwBuildAndSendReadyRsp(qwMsg->connection, code);
qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
}
......@@ -1050,7 +1049,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
} else {
atomic_store_8(&ctx->queryContinue, 1);
......@@ -1067,7 +1066,7 @@ _return:
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwFreeFetchRsp(rsp);
rsp = NULL;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code));
}
......@@ -1102,6 +1101,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) {
atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
} else {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
......@@ -1123,7 +1124,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8(&ctx->queryInQueue, 1);
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection));
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
}
}
......@@ -1143,7 +1144,7 @@ _return:
}
if (code || rsp) {
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
}
......@@ -1181,7 +1182,8 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (!needRsp) {
ctx->dropConnection = qwMsg->connection;
ctx->connInfo.handle == qwMsg->connInfo.handle;
ctx->connInfo.ahandle = qwMsg->connInfo.ahandle;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
......@@ -1205,7 +1207,7 @@ _return:
}
if (TSDB_CODE_SUCCESS != code || needRsp) {
QW_ERR_RET(qwBuildAndSendDropRsp(qwMsg->connection, code));
QW_ERR_RET(qwBuildAndSendDropRsp(&qwMsg->connInfo, code));
QW_TASK_DLOG("drop msg rsped, code:%x", code);
}
......@@ -1223,27 +1225,25 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
atomic_store_ptr(&sch->hbConnection, qwMsg->connection);
atomic_store_ptr(&sch->hbConnection, qwMsg->connInfo);
++sch->hbSeqId;
rsp.seqId = sch->hbSeqId;
QW_DLOG("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, connection:%p",
sch->hbSeqId, req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connection);
QW_DLOG("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p",
sch->hbSeqId, req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
qwReleaseScheduler(QW_READ, mgmt);
_return:
qwBuildAndSendHbRsp(qwMsg->connection, &rsp, code);
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
QW_RET(code);
}
void qwProcessHbTimerEvent(void *param, void *tmrId) {
return;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
......
......@@ -26,6 +26,8 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
return TSDB_CODE_SUCCESS;
}
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
......@@ -44,8 +46,7 @@ void qwFreeFetchRsp(void *msg) {
}
}
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
SQueryTableRsp rsp = {.code = code};
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
......@@ -54,8 +55,8 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = msg,
.contLen = contLen,
.code = code,
......@@ -66,15 +67,14 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -85,15 +85,15 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void *pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
......@@ -104,9 +104,7 @@ int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *pStatus, int32_t cod
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
......@@ -115,8 +113,8 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
......@@ -127,14 +125,14 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -144,15 +142,14 @@ int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -234,8 +231,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
......@@ -248,8 +244,8 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
......@@ -268,6 +264,35 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
return TSDB_CODE_SUCCESS;
}
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
req->refId = rId;
SRpcMsg pMsg = {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.msgType = TDMT_VND_DROP_TASK,
.pCont = req,
.contLen = sizeof(STaskDropReq),
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
};
rpcRegisterBrokenLinkArg(&pMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......@@ -294,7 +319,9 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
char* sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, sql:%s", node, sql);
......@@ -326,7 +353,9 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processCQuery start, node:%p", node);
......@@ -358,7 +387,9 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processReady start, node:%p", node);
......@@ -418,7 +449,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processFetch start, node:%p", node);
......@@ -484,7 +517,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_TASK_DLOG("processDrop start, node:%p", node);
......@@ -516,7 +551,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
QW_SCH_DLOG("processHb start, node:%p", node);
......
......@@ -70,6 +70,22 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *
return TSDB_CODE_SUCCESS;
}
void schFreeRpcCtx(SRpcCtx *pCtx) {
if (NULL == pCtx) {
return;
}
void *pIter = taosHashIterate(pCtx->args, NULL);
while (pIter) {
SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;
ctxVal->free(ctxVal->v);
pIter = taosHashIterate(pCtx->args, pIter);
}
taosHashCleanup(pCtx->args);
}
void schFreeTask(SSchTask* pTask) {
if (pTask->candidateAddrs) {
taosArrayDestroy(pTask->candidateAddrs);
......@@ -106,14 +122,32 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
int32_t reqMsgType = msgType - 1;
switch (msgType) {
case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp
if (lastMsgType != reqMsgType) {
SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType));
}
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType));
}
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return;
case TDMT_VND_RES_READY_RSP:
reqMsgType = TDMT_VND_QUERY;
break;
case TDMT_VND_CREATE_TABLE_RSP:
case TDMT_VND_SUBMIT_RSP:
case TDMT_VND_QUERY_RSP:
case TDMT_VND_RES_READY_RSP:
case TDMT_VND_FETCH_RSP:
case TDMT_VND_DROP_TASK:
if (lastMsgType != (msgType - 1)) {
break;
default:
SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (lastMsgType != reqMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
......@@ -123,13 +157,6 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
break;
default:
SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
......@@ -1006,7 +1033,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
//SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
break;
}
......@@ -1212,8 +1239,122 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
return TSDB_CODE_SUCCESS;
}
void schFreeRpcCtxVal(void *arg) {
if (NULL == arg) {
return;
}
SMsgSendInfo* pMsgSendInfo = arg;
tfree(pMsgSendInfo->param);
tfree(pMsgSendInfo);
}
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0;
SSchCallbackParam *param = NULL;
SMsgSendInfo* pMsgSendInfo = NULL;
pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
if (NULL == pCtx->args) {
SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param = calloc(1, sizeof(SSchCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t msgType = TDMT_VND_RES_READY_RSP;
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_RES_READY, &fp));
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->transport = pJob->transport;
pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.v = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
_return:
taosHashCleanup(pCtx->args);
tfree(param);
tfree(pMsgSendInfo);
SCH_RET(code);
}
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0;
SSchCallbackParam *param = NULL;
SMsgSendInfo* pMsgSendInfo = NULL;
pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
if (NULL == pCtx->args) {
SCH_TASK_ELOG("taosHashInit %d RpcCtx failed", 1);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param = calloc(1, sizeof(SSchCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->transport = pJob->transport;
pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.v = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
_return:
taosHashCleanup(pCtx->args);
tfree(param);
tfree(pMsgSendInfo);
SCH_RET(code);
}
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* epSet, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* epSet, int32_t msgType, void *msg, uint32_t msgSize, bool persistHandle, SRpcCtx *ctx) {
int32_t code = 0;
SSchTrans *trans = (SSchTrans *)transport;
......@@ -1238,7 +1379,6 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet*
param->taskId = SCH_TASK_ID(pTask);
param->transport = trans->transInst;
pMsgSendInfo->param = param;
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
......@@ -1247,7 +1387,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet*
pMsgSendInfo->fp = fp;
int64_t transporterId = 0;
code = asyncSendMsgToServer(trans->transInst, epSet, &transporterId, pMsgSendInfo);
code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
if (code) {
SCH_ERR_JRET(code);
}
......@@ -1267,6 +1407,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
void *msg = NULL;
int32_t code = 0;
bool isCandidateAddr = false;
bool persistHandle = false;
SRpcCtx rpcCtx = {0};
if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
isCandidateAddr = true;
......@@ -1289,8 +1432,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
}
case TDMT_VND_QUERY: {
uint32_t len = strlen(pJob->sql);
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
uint32_t len = strlen(pJob->sql);
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
msg = calloc(1, msgSize);
if (NULL == msg) {
......@@ -1311,6 +1455,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
memcpy(pMsg->msg, pJob->sql, len);
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
persistHandle = true;
break;
}
......@@ -1367,6 +1512,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
break;
}
case TDMT_VND_QUERY_HEARTBEAT: {
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
SSchedulerHbReq req = {0};
req.sId = schMgmt.sId;
req.header.vgId = addr->nodeId;
......@@ -1387,6 +1534,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
persistHandle = true;
break;
}
default:
......@@ -1398,7 +1547,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask ? pTask->handle : NULL};
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize));
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)));
if (isCandidateAddr) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr));
......@@ -1409,7 +1558,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
_return:
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
schFreeRpcCtx(&rpcCtx);
tfree(msg);
SCH_RET(code);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册