提交 97d587e3 编写于 作者: D dapan1121

feature/scheduler

上级 99b24c71
...@@ -165,8 +165,8 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp ...@@ -165,8 +165,8 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
int32_t code = -1; int32_t code = -1;
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
pHead->contLen = htonl(pHead->contLen); pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId); pHead->vgId = ntohl(pHead->vgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
......
...@@ -69,6 +69,7 @@ enum { ...@@ -69,6 +69,7 @@ enum {
typedef struct SQWDebug { typedef struct SQWDebug {
bool lockEnable; bool lockEnable;
bool statusEnable; bool statusEnable;
bool dumpEnable;
} SQWDebug; } SQWDebug;
typedef struct SQWConnInfo { typedef struct SQWConnInfo {
...@@ -123,9 +124,9 @@ typedef struct SQWTaskCtx { ...@@ -123,9 +124,9 @@ typedef struct SQWTaskCtx {
typedef struct SQWSchStatus { typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second int32_t lastAccessTs; // timestamp in second
SRWLatch connLock; SRWLatch hbConnLock;
SQWConnInfo hbConnInfo; SQWConnInfo hbConnInfo;
SQueryNodeEpId epId; SQueryNodeEpId hbEpId;
SRWLatch tasksLock; SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus; } SQWSchStatus;
...@@ -175,6 +176,9 @@ typedef struct SQWorkerMgmt { ...@@ -175,6 +176,9 @@ typedef struct SQWorkerMgmt {
#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__) #define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__) #define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DUMP(param, ...) do { if (gQWDebug.dumpEnable) { qDebug("QW:%p " param, mgmt, __VA_ARGS__); } } while (0)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "tname.h" #include "tname.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
SQWDebug gQWDebug = {.statusEnable = true}; SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) { if (!gQWDebug.statusEnable) {
...@@ -103,6 +103,36 @@ _return: ...@@ -103,6 +103,36 @@ _return:
QW_RET(code); QW_RET(code);
} }
void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {
}
void qwDbgDumpMgmtInfo(SQWorkerMgmt *mgmt) {
if (!gQWDebug.dumpEnable) {
return;
}
QW_LOCK(QW_READ, &mgmt->schLock);
QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0;
SQWSchStatus *sch = NULL;
void *pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) {
sch = (SQWSchStatus *)pIter;
qwDbgDumpSchInfo(sch, i);
++i;
pIter = taosHashIterate(mgmt->schHash, pIter);
}
QW_UNLOCK(QW_READ, &mgmt->schLock);
QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));
}
char *qwPhaseStr(int32_t phase) { char *qwPhaseStr(int32_t phase) {
switch (phase) { switch (phase) {
...@@ -581,7 +611,7 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI ...@@ -581,7 +611,7 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI
int32_t taskNum = 0; int32_t taskNum = 0;
hbInfo->connInfo = sch->hbConnInfo; hbInfo->connInfo = sch->hbConnInfo;
hbInfo->rsp.epId = sch->epId; hbInfo->rsp.epId = sch->hbEpId;
QW_LOCK(QW_READ, &sch->tasksLock); QW_LOCK(QW_READ, &sch->tasksLock);
...@@ -1248,16 +1278,16 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { ...@@ -1248,16 +1278,16 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_LOCK(QW_WRITE, &sch->connLock); QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) { if (sch->hbConnInfo.handle) {
rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
} }
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
memcpy(&sch->epId, &req->epId, sizeof(req->epId)); memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
QW_UNLOCK(QW_WRITE, &sch->connLock); QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p",
req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle); req->sId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
...@@ -1280,6 +1310,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { ...@@ -1280,6 +1310,8 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWHbInfo *rspList = NULL; SQWHbInfo *rspList = NULL;
int32_t code = 0; int32_t code = 0;
qwDbgDumpMgmtInfo(mgmt);
QW_LOCK(QW_READ, &mgmt->schLock); QW_LOCK(QW_READ, &mgmt->schLock);
int32_t schNum = taosHashGetSize(mgmt->schHash); int32_t schNum = taosHashGetSize(mgmt->schHash);
......
...@@ -272,11 +272,11 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -272,11 +272,11 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
req->header.vgId = mgmt->nodeId; req->header.vgId = htonl(mgmt->nodeId);
req->sId = sId; req->sId = htobe64(sId);
req->queryId = qId; req->queryId = htobe64(qId);
req->taskId = tId; req->taskId = htobe64(tId);
req->refId = rId; req->refId = htobe64(rId);
SRpcMsg pMsg = { SRpcMsg pMsg = {
.handle = pConn->handle, .handle = pConn->handle,
...@@ -532,6 +532,10 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -532,6 +532,10 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
}
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle); QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册