From 97d587e379b54deaaea097e7f84530b9e6a69df6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 24 Mar 2022 14:37:53 +0800 Subject: [PATCH] feature/scheduler --- source/dnode/mgmt/vnode/src/vmWorker.c | 4 +-- source/libs/qworker/inc/qworkerInt.h | 8 +++-- source/libs/qworker/src/qworker.c | 42 +++++++++++++++++++++++--- source/libs/qworker/src/qworkerMsg.c | 14 ++++++--- 4 files changed, 54 insertions(+), 14 deletions(-) diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 6c7d513c58..c493e44bda 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -165,8 +165,8 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp int32_t code = -1; SMsgHead *pHead = pRpc->pCont; - pHead->contLen = htonl(pHead->contLen); - pHead->vgId = htonl(pHead->vgId); + pHead->contLen = ntohl(pHead->contLen); + pHead->vgId = ntohl(pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 41891c7cac..f6bc204227 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -69,6 +69,7 @@ enum { typedef struct SQWDebug { bool lockEnable; bool statusEnable; + bool dumpEnable; } SQWDebug; typedef struct SQWConnInfo { @@ -123,9 +124,9 @@ typedef struct SQWTaskCtx { typedef struct SQWSchStatus { int32_t lastAccessTs; // timestamp in second - SRWLatch connLock; + SRWLatch hbConnLock; SQWConnInfo hbConnInfo; - SQueryNodeEpId epId; + SQueryNodeEpId hbEpId; SRWLatch tasksLock; SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus } SQWSchStatus; @@ -175,6 +176,9 @@ typedef struct SQWorkerMgmt { #define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__) #define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__) +#define QW_DUMP(param, ...) do { if (gQWDebug.dumpEnable) { qDebug("QW:%p " param, mgmt, __VA_ARGS__); } } while (0) + + #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 07e31d549e..53235cae68 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -9,7 +9,7 @@ #include "tname.h" #include "dataSinkMgt.h" -SQWDebug gQWDebug = {.statusEnable = true}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -103,6 +103,36 @@ _return: QW_RET(code); } +void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) { + +} + +void qwDbgDumpMgmtInfo(SQWorkerMgmt *mgmt) { + if (!gQWDebug.dumpEnable) { + return; + } + + QW_LOCK(QW_READ, &mgmt->schLock); + + QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash)); + + void *key = NULL; + size_t keyLen = 0; + int32_t i = 0; + SQWSchStatus *sch = NULL; + + void *pIter = taosHashIterate(mgmt->schHash, NULL); + while (pIter) { + sch = (SQWSchStatus *)pIter; + qwDbgDumpSchInfo(sch, i); + ++i; + pIter = taosHashIterate(mgmt->schHash, pIter); + } + + QW_UNLOCK(QW_READ, &mgmt->schLock); + + QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash)); +} char *qwPhaseStr(int32_t phase) { switch (phase) { @@ -581,7 +611,7 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI int32_t taskNum = 0; hbInfo->connInfo = sch->hbConnInfo; - hbInfo->rsp.epId = sch->epId; + hbInfo->rsp.epId = sch->hbEpId; QW_LOCK(QW_READ, &sch->tasksLock); @@ -1248,16 +1278,16 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); - QW_LOCK(QW_WRITE, &sch->connLock); + QW_LOCK(QW_WRITE, &sch->hbConnLock); if (sch->hbConnInfo.handle) { rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); } memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); - memcpy(&sch->epId, &req->epId, sizeof(req->epId)); + memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId)); - QW_UNLOCK(QW_WRITE, &sch->connLock); + QW_UNLOCK(QW_WRITE, &sch->hbConnLock); QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle); @@ -1280,6 +1310,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { SQWHbInfo *rspList = NULL; int32_t code = 0; + qwDbgDumpMgmtInfo(mgmt); + QW_LOCK(QW_READ, &mgmt->schLock); int32_t schNum = taosHashGetSize(mgmt->schHash); diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index e6a1260de5..97ef53aaea 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -272,11 +272,11 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - req->header.vgId = mgmt->nodeId; - req->sId = sId; - req->queryId = qId; - req->taskId = tId; - req->refId = rId; + req->header.vgId = htonl(mgmt->nodeId); + req->sId = htobe64(sId); + req->queryId = htobe64(qId); + req->taskId = htobe64(tId); + req->refId = htobe64(rId); SRpcMsg pMsg = { .handle = pConn->handle, @@ -532,6 +532,10 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { + QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code)); + } + QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle); QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); -- GitLab