提交 43336c0a 编写于 作者: D dapan1121

feature/scheduler

上级 f26fe260
...@@ -1112,7 +1112,6 @@ int32_t tDeserializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* ...@@ -1112,7 +1112,6 @@ int32_t tDeserializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq*
void tFreeSSchedulerHbReq(SSchedulerHbReq* pReq); void tFreeSSchedulerHbReq(SSchedulerHbReq* pReq);
typedef struct { typedef struct {
uint64_t seqId;
SQueryNodeEpId epId; SQueryNodeEpId epId;
SArray* taskStatus; // SArray<STaskStatus> SArray* taskStatus; // SArray<STaskStatus>
} SSchedulerHbRsp; } SSchedulerHbRsp;
......
...@@ -150,7 +150,7 @@ int32_t cleanupTaskQueue(); ...@@ -150,7 +150,7 @@ int32_t cleanupTaskQueue();
*/ */
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, SRpcCtx *ctx); int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *ctx);
/** /**
* Asynchronously send message to server, after the response received, the callback will be incured. * Asynchronously send message to server, after the response received, the callback will be incured.
......
...@@ -2567,7 +2567,6 @@ int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pR ...@@ -2567,7 +2567,6 @@ int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pR
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->seqId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1; if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1;
if (tEncodeU16(&encoder, pRsp->epId.ep.port) < 0) return -1; if (tEncodeU16(&encoder, pRsp->epId.ep.port) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1;
...@@ -2596,7 +2595,6 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp * ...@@ -2596,7 +2595,6 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->seqId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->epId.nodeId) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->epId.nodeId) < 0) return -1;
if (tDecodeU16(&decoder, &pRsp->epId.ep.port) < 0) return -1; if (tDecodeU16(&decoder, &pRsp->epId.ep.port) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->epId.ep.fqdn) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->epId.ep.fqdn) < 0) return -1;
......
...@@ -987,6 +987,8 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { ...@@ -987,6 +987,8 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
code = 0; code = 0;
} }
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
// no jump, need to construct rsp
} else { } else {
pDb = mndAcquireDb(pMnode, usedbReq.db); pDb = mndAcquireDb(pMnode, usedbReq.db);
if (pDb == NULL) { if (pDb == NULL) {
......
...@@ -33,9 +33,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -33,9 +33,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config}; SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config};
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_QUERY: { case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg);
}
case TDMT_VND_QUERY_CONTINUE: case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg); return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg);
default: default:
...@@ -205,7 +204,7 @@ _exit: ...@@ -205,7 +204,7 @@ _exit:
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
return code; return TSDB_CODE_SUCCESS;
} }
static void freeItemHelper(void *pItem) { static void freeItemHelper(void *pItem) {
......
...@@ -140,7 +140,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) ...@@ -140,7 +140,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
return 0; return 0;
} }
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, SRpcCtx *ctx) { int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo, bool persistHandle, void *rpcCtx) {
char* pMsg = rpcMallocCont(pInfo->msgInfo.len); char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) { if (NULL == pMsg) {
qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
...@@ -163,7 +163,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra ...@@ -163,7 +163,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
assert(pInfo->fp != NULL); assert(pInfo->fp != NULL);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, ctx); rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -84,7 +84,7 @@ typedef struct SQWMsg { ...@@ -84,7 +84,7 @@ typedef struct SQWMsg {
typedef struct SQWHbInfo { typedef struct SQWHbInfo {
SSchedulerHbRsp rsp; SSchedulerHbRsp rsp;
void *connection; SQWConnInfo connInfo;
} SQWHbInfo; } SQWHbInfo;
typedef struct SQWPhaseInput { typedef struct SQWPhaseInput {
...@@ -122,8 +122,8 @@ typedef struct SQWTaskCtx { ...@@ -122,8 +122,8 @@ typedef struct SQWTaskCtx {
typedef struct SQWSchStatus { typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second int32_t lastAccessTs; // timestamp in second
uint64_t hbSeqId; SRWLatch connLock;
SQWConnInfo *hbConnection; SQWConnInfo connInfo;
SRWLatch tasksLock; SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus; } SQWSchStatus;
...@@ -225,8 +225,6 @@ typedef struct SQWorkerMgmt { ...@@ -225,8 +225,6 @@ typedef struct SQWorkerMgmt {
} \ } \
} while (0) } while (0)
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -30,17 +30,17 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); ...@@ -30,17 +30,17 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code); int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code); int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code); int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code); int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code);
void qwFreeFetchRsp(void *msg); void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *rsp, int32_t code); int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
......
...@@ -402,7 +402,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -402,7 +402,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
rpcReleaseHandle(ctx->connInfo.handle, CONN_SERVER); rpcReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER);
ctx->connInfo.handle = NULL; ctx->connInfo.handle = NULL;
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
...@@ -591,8 +591,7 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI ...@@ -591,8 +591,7 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
hbInfo->connection = sch->hbConnection; hbInfo->connInfo = sch->connInfo;
hbInfo->rsp.seqId = -1;
void *key = NULL; void *key = NULL;
size_t keyLen = 0; size_t keyLen = 0;
...@@ -947,7 +946,7 @@ _return: ...@@ -947,7 +946,7 @@ _return:
QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code)); QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code));
} }
QW_RET(code); QW_RET(TSDB_CODE_SUCCESS);
} }
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
...@@ -1010,7 +1009,7 @@ _return: ...@@ -1010,7 +1009,7 @@ _return:
QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code));
} }
QW_RET(code); QW_RET(TSDB_CODE_SUCCESS);
} }
...@@ -1081,7 +1080,9 @@ _return: ...@@ -1081,7 +1080,9 @@ _return:
} while (true); } while (true);
input.code = code; input.code = code;
QW_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL)); qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, NULL);
QW_RET(TSDB_CODE_SUCCESS);
} }
...@@ -1148,7 +1149,7 @@ _return: ...@@ -1148,7 +1149,7 @@ _return:
QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen);
} }
QW_RET(code); QW_RET(TSDB_CODE_SUCCESS);
} }
...@@ -1207,12 +1208,12 @@ _return: ...@@ -1207,12 +1208,12 @@ _return:
} }
if (TSDB_CODE_SUCCESS != code || needRsp) { if (TSDB_CODE_SUCCESS != code || needRsp) {
QW_ERR_RET(qwBuildAndSendDropRsp(&qwMsg->connInfo, code)); qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop msg rsped, code:%x", code); QW_TASK_DLOG("drop msg rsped, code:%x", code);
} }
QW_RET(code); QW_RET(TSDB_CODE_SUCCESS);
} }
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
...@@ -1220,18 +1221,22 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { ...@@ -1220,18 +1221,22 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
uint64_t seqId = 0; uint64_t seqId = 0;
void *origHandle = NULL;
memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
atomic_store_ptr(&sch->hbConnection, qwMsg->connInfo); QW_LOCK(QW_WRITE, &sch->connLock);
++sch->hbSeqId;
origHandle = sch->connInfo.handle;
rsp.seqId = sch->hbSeqId; memcpy(&sch->connInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
QW_DLOG("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", QW_UNLOCK(QW_WRITE, &sch->connLock);
sch->hbSeqId, req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p",
req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -1239,7 +1244,7 @@ _return: ...@@ -1239,7 +1244,7 @@ _return:
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
QW_RET(code); QW_RET(TSDB_CODE_SUCCESS);
} }
...@@ -1288,8 +1293,8 @@ _return: ...@@ -1288,8 +1293,8 @@ _return:
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
for (int32_t j = 0; j < i; ++j) { for (int32_t j = 0; j < i; ++j) {
QW_DLOG("hb on connection %p, taskNum:%d", rspList[j].connection, (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); QW_DLOG("hb on connection handle %p, taskNum:%d", rspList[j].connInfo.handle, (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
qwBuildAndSendHbRsp(rspList[j].connection, &rspList[j].rsp, code); qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
tFreeSSchedulerHbRsp(&rspList[j].rsp); tFreeSSchedulerHbRsp(&rspList[j].rsp);
} }
......
...@@ -484,11 +484,15 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -484,11 +484,15 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
msg->taskId = be64toh(msg->taskId); msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId); msg->refId = be64toh(msg->refId);
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); //QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return: _return:
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code)); QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -572,7 +576,7 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -572,7 +576,7 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
SVShowTablesReq *pReq = pMsg->pCont; SVShowTablesReq *pReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code)); QW_RET(qwBuildAndSendShowRsp(pMsg, code));
} }
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...@@ -581,7 +585,7 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) ...@@ -581,7 +585,7 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg)
} }
SVShowTablesFetchReq *pFetchReq = pMsg->pCont; SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
} }
...@@ -44,7 +44,6 @@ typedef struct SSchTrans { ...@@ -44,7 +44,6 @@ typedef struct SSchTrans {
typedef struct SSchHbTrans { typedef struct SSchHbTrans {
SRWLatch lock; SRWLatch lock;
uint64_t seqId;
SSchTrans trans; SSchTrans trans;
} SSchHbTrans; } SSchHbTrans;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "schedulerInt.h" #include "schedulerInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h"
SSchedulerMgmt schMgmt = {0}; SSchedulerMgmt schMgmt = {0};
...@@ -73,7 +74,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx) { ...@@ -73,7 +74,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
while (pIter) { while (pIter) {
SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter; SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;
ctxVal->free(ctxVal->v); ctxVal->free(ctxVal->val);
pIter = taosHashIterate(pCtx->args, pIter); pIter = taosHashIterate(pCtx->args, pIter);
} }
...@@ -127,7 +128,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m ...@@ -127,7 +128,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
} }
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return; return TSDB_CODE_SUCCESS;
case TDMT_VND_RES_READY_RSP: case TDMT_VND_RES_READY_RSP:
reqMsgType = TDMT_VND_QUERY; reqMsgType = TDMT_VND_QUERY;
break; break;
...@@ -658,52 +659,42 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { ...@@ -658,52 +659,42 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) { int32_t schRegisterHbConnection(SQueryNodeEpId *epId, bool *exist) {
int32_t code = 0; int32_t code = 0;
SSchHbTrans *hb = NULL; SSchHbTrans hb = {0};
while (true) { code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), trans, sizeof(SSchHbTrans));
if (code) { if (code) {
if (HASH_NODE_EXIST(code)) { if (HASH_NODE_EXIST(code)) {
continue; *exist = true;
return TSDB_CODE_SUCCESS;
} }
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(code); SCH_ERR_RET(code);
} }
qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64
", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p",
trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst,
trans->trans.transHandle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
break;
}
SCH_LOCK(SCH_WRITE, &hb->lock);
if (hb->seqId >= trans->seqId) { int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) {
qDebug("hb trans seqId is old, seqId:%" PRId64 ", currentId:%" PRId64 ", nodeId:%d, fqdn:%s, port:%d", trans->seqId, int32_t code = 0;
hb->seqId, epId->nodeId, epId->ep.fqdn, epId->ep.port); SSchHbTrans *hb = NULL;
SCH_UNLOCK(SCH_WRITE, &hb->lock); hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
return TSDB_CODE_SUCCESS; if (NULL == hb) {
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(code);
} }
hb->seqId = trans->seqId; SCH_LOCK(SCH_WRITE, &hb->lock);
memcpy(&hb->trans, &trans->trans, sizeof(trans->trans)); memcpy(&hb->trans, &trans->trans, sizeof(trans->trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock); SCH_UNLOCK(SCH_WRITE, &hb->lock);
qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 qDebug("hb connection updated, sId:%" PRIx64
", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p", ", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p",
trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst,
trans->trans.transHandle); trans->trans.transHandle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1159,14 +1150,11 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { ...@@ -1159,14 +1150,11 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
if (rsp.seqId != (uint64_t)-1) {
SSchHbTrans trans = {0}; SSchHbTrans trans = {0};
trans.seqId = rsp.seqId;
trans.trans.transInst = pParam->transport; trans.trans.transInst = pParam->transport;
trans.trans.transHandle = pMsg->handle; trans.trans.transHandle = pMsg->handle;
SCH_RET(schUpdateHbConnection(&rsp.epId, &trans)); SCH_RET(schUpdateHbConnection(&rsp.epId, &trans));
}
int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus); int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
...@@ -1268,7 +1256,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { ...@@ -1268,7 +1256,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
pMsgSendInfo->param = param; pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.v = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -1320,7 +1308,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { ...@@ -1320,7 +1308,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
pMsgSendInfo->param = param; pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.v = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -1557,8 +1545,12 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { ...@@ -1557,8 +1545,12 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) { if (NULL == hb) {
bool exist = false;
SCH_ERR_RET(schRegisterHbConnection(&epId, &exist));
if (!exist) {
SCH_ERR_RET(schBuildAndSendMsg(pJob, NULL, addr, TDMT_VND_QUERY_HEARTBEAT)); SCH_ERR_RET(schBuildAndSendMsg(pJob, NULL, addr, TDMT_VND_QUERY_HEARTBEAT));
} }
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -746,7 +746,7 @@ TEST(queryTest, readyFirstCase) { ...@@ -746,7 +746,7 @@ TEST(queryTest, readyFirstCase) {
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
pIter = taosHashIterate(pJob->execTasks, NULL); void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) { while (pIter) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
...@@ -757,7 +757,7 @@ TEST(queryTest, readyFirstCase) { ...@@ -757,7 +757,7 @@ TEST(queryTest, readyFirstCase) {
pIter = taosHashIterate(pJob->execTasks, pIter); pIter = taosHashIterate(pJob->execTasks, pIter);
} }
void *pIter = taosHashIterate(pJob->execTasks, NULL); pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) { while (pIter) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
...@@ -792,11 +792,11 @@ TEST(queryTest, readyFirstCase) { ...@@ -792,11 +792,11 @@ TEST(queryTest, readyFirstCase) {
pthread_attr_t thattr; TdThreadAttr thattr;
pthread_attr_init(&thattr); taosThreadAttrInit(&thattr);
pthread_t thread1; TdThread thread1;
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job); taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL; void *data = NULL;
code = schedulerFetchRows(job, &data); code = schedulerFetchRows(job, &data);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册