提交 c277b67d 编写于 作者: D dapan1121

feature/qnode

上级 08541c0f
......@@ -1048,6 +1048,7 @@ typedef struct SSubQueryMsg {
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int64_t refId;
int8_t taskType;
uint32_t sqlLen; // the query sql,
uint32_t phyLen;
......@@ -1094,19 +1095,57 @@ typedef struct {
typedef struct {
uint64_t queryId;
uint64_t taskId;
int64_t refId;
int8_t status;
} STaskStatus;
typedef struct {
uint32_t num;
STaskStatus status[];
int64_t refId;
SArray *taskStatus; //SArray<STaskStatus>
} SSchedulerStatusRsp;
typedef struct {
uint64_t queryId;
uint64_t taskId;
int8_t action;
} STaskAction;
typedef struct SQueryNodeEpId {
int32_t nodeId; // vgId or qnodeId
SEp ep;
} SQueryNodeEpId;
typedef struct {
SMsgHead header;
uint64_t sId;
SQueryNodeEpId epId;
SArray *taskAction; //SArray<STaskAction>
} SSchedulerHbReq;
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq);
int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq);
void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq);
typedef struct {
uint64_t seqId;
SQueryNodeEpId epId;
SArray *taskStatus; //SArray<STaskStatus>
} SSchedulerHbRsp;
int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp);
int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp);
void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp);
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int64_t refId;
} STaskCancelReq;
typedef struct {
......@@ -1118,6 +1157,7 @@ typedef struct {
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int64_t refId;
} STaskDropReq;
typedef struct {
......
......@@ -181,6 +181,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp)
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
......
......@@ -133,6 +133,7 @@ typedef struct SQueryNodeAddr {
SEpSet epset;
} SQueryNodeAddr;
typedef struct SQueryNodeStat {
int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
......
......@@ -72,6 +72,8 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
......
......@@ -2458,3 +2458,149 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq)
tEndDecode(decoder);
return 0;
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeU64(&encoder, pReq->sId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->epId.nodeId) < 0) return -1;
if (tEncodeU16(&encoder, pReq->epId.ep.port) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->epId.ep.fqdn) < 0) return -1;
if (pReq->taskAction) {
int32_t num = taosArrayGetSize(pReq->taskAction);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
STaskAction *action = taosArrayGet(pReq->taskAction, i);
if (tEncodeU64(&encoder, action->queryId) < 0) return -1;
if (tEncodeU64(&encoder, action->taskId) < 0) return -1;
if (tEncodeI8(&encoder, action->action) < 0) return -1;
}
} else {
if (tEncodeI32(&encoder, 0) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->header.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->header.vgId;
pHead->contLen = pReq->header.contLen;
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (char *)buf + headLen, bufLen - headLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->epId.nodeId) < 0) return -1;
if (tDecodeU16(&decoder, &pReq->epId.ep.port) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->epId.ep.fqdn) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
pReq->taskAction = taosArrayInit(num, sizeof(STaskStatus));
if (NULL == pReq->taskAction) return -1;
for (int32_t i = 0; i < num; ++i) {
STaskAction action = {0};
if (tDecodeU64(&decoder, &action.queryId) < 0) return -1;
if (tDecodeU64(&decoder, &action.taskId) < 0) return -1;
if (tDecodeI8(&decoder, &action.action) < 0) return -1;
taosArrayPush(pReq->taskAction, &action);
}
} else {
pReq->taskAction = NULL;
}
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq) { taosArrayDestroy(pReq->taskAction); }
int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->seqId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->epId.nodeId) < 0) return -1;
if (tEncodeU16(&encoder, pRsp->epId.ep.port) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->epId.ep.fqdn) < 0) return -1;
if (pRsp->taskStatus) {
int32_t num = taosArrayGetSize(pRsp->taskStatus);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
STaskStatus *status = taosArrayGet(pRsp->taskStatus, i);
if (tEncodeU64(&encoder, status->queryId) < 0) return -1;
if (tEncodeU64(&encoder, status->taskId) < 0) return -1;
if (tEncodeI64(&encoder, status->refId) < 0) return -1;
if (tEncodeI8(&encoder, status->status) < 0) return -1;
}
} else {
if (tEncodeI32(&encoder, 0) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->seqId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->epId.nodeId) < 0) return -1;
if (tDecodeU16(&decoder, &pRsp->epId.ep.port) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->epId.ep.fqdn) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
pRsp->taskStatus = taosArrayInit(num, sizeof(STaskStatus));
if (NULL == pRsp->taskStatus) return -1;
for (int32_t i = 0; i < num; ++i) {
STaskStatus status = {0};
if (tDecodeU64(&decoder, &status.queryId) < 0) return -1;
if (tDecodeU64(&decoder, &status.taskId) < 0) return -1;
if (tDecodeI64(&decoder, &status.refId) < 0) return -1;
if (tDecodeI8(&decoder, &status.status) < 0) return -1;
taosArrayPush(pRsp->taskStatus, &status);
}
} else {
pRsp->taskStatus = NULL;
}
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp) { taosArrayDestroy(pRsp->taskStatus); }
......@@ -152,6 +152,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
}
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
......
......@@ -36,6 +36,7 @@ static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveQnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
static int32_t mndProcessQnodeListReq(SMnodeMsg *pReq);
int32_t mndInitQnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_QNODE,
......@@ -447,8 +448,8 @@ static int32_t mndProcessQnodeListReq(SMnodeMsg *pReq) {
goto QNODE_LIST_OVER;
}
qlistRsp->epSetList = taosArrayInit(5, sizeof(SEpSet));
if (NULL == qlistRsp->epSetList) {
qlistRsp.epSetList = taosArrayInit(5, sizeof(SEpSet));
if (NULL == qlistRsp.epSetList) {
mError("taosArrayInit epSet failed");
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto QNODE_LIST_OVER;
......@@ -463,12 +464,12 @@ static int32_t mndProcessQnodeListReq(SMnodeMsg *pReq) {
epSet.eps[0].port = pObj->pDnode->port;
epSet.numOfEps = 1;
taosArrayPush(qlistRsp->epSetList, &epSet);
taosArrayPush(qlistRsp.epSetList, &epSet);
numOfRows++;
sdbRelease(pSdb, pObj);
if (qlistReq->rowNum > 0 && numOfRows >= qlistReq->rowNum) {
if (qlistReq.rowNum > 0 && numOfRows >= qlistReq.rowNum) {
break;
}
}
......
......@@ -11,4 +11,7 @@ target_link_libraries(
PRIVATE os
PRIVATE common
PRIVATE util
PRIVATE qworker
PRIVATE qcom
PRIVATE executor
)
\ No newline at end of file
......@@ -28,6 +28,8 @@
extern "C" {
#endif
typedef struct SQWorkerMgmt SQHandle;
typedef struct SQnode {
int32_t qndId;
SQnodeOpt opt;
......
......@@ -14,6 +14,13 @@
*/
#include "qndInt.h"
#include "query.h"
#include "qworker.h"
#include "executor.h"
int32_t qnodePutReqToVQueryQ(SQnode* pQnode, struct SRpcMsg* pReq) {}
void qnodeSendReqToDnode(SQnode* pQnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {}
SQnode *qndOpen(const SQnodeOpt *pOption) {
SQnode *pQnode = calloc(1, sizeof(SQnode));
......@@ -23,7 +30,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
}
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, pQnode,
(putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode)) {
(putReqToQueryQFp)qnodePutReqToVQueryQ, (sendReqToDnodeFp)qnodeSendReqToDnode)) {
tfree(pQnode);
return NULL;
}
......@@ -32,7 +39,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
}
void qndClose(SQnode *pQnode) {
qWorkerDestroy(&pQnode->pQuery);
qWorkerDestroy((void **)&pQnode->pQuery);
free(pQnode);
}
......@@ -55,7 +62,7 @@ int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
qError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
......@@ -84,7 +91,7 @@ int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
case TDMT_VND_CONSUME:
//return tqProcessConsumeReq(pQnode->pTq, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
qError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
......
......@@ -26,6 +26,7 @@ extern "C" {
typedef struct SQWorkerMgmt SQHandle;
int vnodeQueryOpen(SVnode *pVnode);
void vnodeQueryClose(SVnode *pVnode);
#ifdef __cplusplus
}
......
......@@ -154,5 +154,6 @@ static void vnodeCloseImpl(SVnode *pVnode) {
tsdbClose(pVnode->pTsdb);
tqClose(pVnode->pTq);
walClose(pVnode->pWal);
vnodeQueryClose(pVnode);
}
}
......@@ -24,6 +24,10 @@ int vnodeQueryOpen(SVnode *pVnode) {
(putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode);
}
void vnodeQueryClose(SVnode *pVnode) {
qWorkerDestroy((void **)&pVnode->pQuery);
}
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in query queue is processing");
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta};
......@@ -64,6 +68,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME:
return tqProcessConsumeReq(pVnode->pTq, pMsg);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......
......@@ -580,7 +580,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for qnode list, error:%s, db:%s", tstrerror(rpcRsp.code));
ctgError("error rsp for qnode list, error:%s", tstrerror(rpcRsp.code));
CTG_ERR_RET(rpcRsp.code);
}
......@@ -590,7 +590,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
CTG_ERR_RET(code);
}
ctgDebug("Got qnode list from mnode, listNum:%d", taosArrayGetSize(*out));
ctgDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*out));
return TSDB_CODE_SUCCESS;
}
......
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "tlockfree.h"
#include "ttimer.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QW_DEFAULT_TASK_NUMBER 10000
......@@ -84,6 +85,11 @@ typedef struct SQWMsg {
void *connection;
} SQWMsg;
typedef struct SQWHbInfo {
SSchedulerHbRsp rsp;
void *connection;
} SQWHbInfo;
typedef struct SQWPhaseInput {
int8_t taskStatus;
int8_t taskType;
......@@ -97,6 +103,7 @@ typedef struct SQWPhaseOutput {
typedef struct SQWTaskStatus {
int64_t refId; // job's refId
int32_t code;
int8_t status;
} SQWTaskStatus;
......@@ -124,6 +131,8 @@ typedef struct SQWTaskCtx {
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
uint64_t hbSeqId;
void *hbConnection;
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus;
......@@ -144,8 +153,8 @@ typedef struct SQWorkerMgmt {
sendReqToDnodeFp sendReqFp;
} SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId
#define QW_IDS() sId, qId, tId
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
#define QW_IDS() sId, qId, tId, rId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event])
......
......@@ -23,22 +23,24 @@ extern "C" {
#include "qworkerInt.h"
#include "dataSinkMgt.h"
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg, int8_t taskType);
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code);
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection);
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code);
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *rsp, int32_t code);
......
......@@ -106,7 +106,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
}
int32_t qwAddSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
SQWSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) {
......@@ -132,7 +132,7 @@ int32_t qwAddSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
return TSDB_CODE_SUCCESS;
}
int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
while (true) {
QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
......@@ -140,7 +140,7 @@ int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sc
QW_UNLOCK(rwType, &mgmt->schLock);
if (QW_NOT_EXIST_ADD == nOpt) {
QW_ERR_RET(qwAddSchedulerImpl(QW_FPARAMS(), rwType, sch));
QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType, sch));
nOpt = QW_NOT_EXIST_RET_ERR;
......@@ -148,7 +148,7 @@ int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sc
} else if (QW_NOT_EXIST_RET_ERR == nOpt) {
QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
} else {
QW_TASK_ELOG("unknown notExistOpt:%d", nOpt);
QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
}
......@@ -159,12 +159,12 @@ int32_t qwAcquireSchedulerImpl(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sc
return TSDB_CODE_SUCCESS;
}
int32_t qwAcquireAddScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_ADD);
int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
}
int32_t qwAcquireScheduler(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_RET_ERR);
int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
}
void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
......@@ -196,6 +196,7 @@ int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, i
SQWTaskStatus ntask = {0};
ntask.status = status;
ntask.refId = rId;
QW_LOCK(QW_WRITE, &sch->tasksLock);
code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
......@@ -225,7 +226,7 @@ int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, i
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
SQWSchStatus *tsch = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireAddScheduler(QW_FPARAMS(), QW_READ, &tsch));
QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
......@@ -411,7 +412,7 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
if (qwAcquireScheduler(QW_FPARAMS(), QW_WRITE, &sch)) {
if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
QW_TASK_WLOG_E("scheduler does not exist");
return TSDB_CODE_SUCCESS;
}
......@@ -443,7 +444,7 @@ int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_FPARAMS(), QW_READ, &sch));
QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
......@@ -521,25 +522,27 @@ _return:
QW_RET(code);
}
int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SSchedulerStatusRsp **rsp) {
int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) {
int32_t taskNum = 0;
QW_LOCK(QW_READ, &sch->tasksLock);
taskNum = taosHashGetSize(sch->tasksHash);
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
*rsp = calloc(1, size);
if (NULL == *rsp) {
QW_SCH_ELOG("calloc %d failed", size);
hbInfo->rsp.taskStatus = taosArrayInit(taskNum, sizeof(STaskStatus));
if (NULL == hbInfo->rsp.taskStatus) {
QW_UNLOCK(QW_READ, &sch->tasksLock);
QW_ELOG("taosArrayInit taskStatus failed, num:%d", taskNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
hbInfo->connection = sch->hbConnection;
hbInfo->rsp.seqId = -1;
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0;
STaskStatus status = {0};
void *pIter = taosHashIterate(sch->tasksHash, NULL);
while (pIter) {
......@@ -548,8 +551,11 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SSchedulerStat
//TODO GET EXECUTOR API TO GET MORE INFO
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
(*rsp)->status[i].status = taskStatus->status;
QW_GET_QTID(key, status.queryId, status.taskId);
status.status = taskStatus->status;
status.refId = taskStatus->refId;
taosArrayPush(hbInfo->rsp.taskStatus, &status);
++i;
pIter = taosHashIterate(sch->tasksHash, pIter);
......@@ -557,8 +563,6 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SSchedulerStat
QW_UNLOCK(QW_READ, &sch->tasksLock);
(*rsp)->num = taskNum;
return TSDB_CODE_SUCCESS;
}
......@@ -1353,11 +1357,36 @@ _return:
QW_RET(code);
}
void qwProcessHbTimer(void *param, void *tmrId) {
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0;
SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL;
uint64_t seqId = 0;
memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
atomic_store_ptr(&sch->hbConnection, qwMsg->connection);
++sch->hbSeqId;
rsp.seqId = sch->hbSeqId;
qwReleaseScheduler(QW_READ, mgmt);
_return:
qwBuildAndSendHbRsp(qwMsg->connection, &rsp, code);
QW_RET(code);
}
void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param;
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
SSchedulerStatusRsp **rspList = NULL;
SQWHbInfo *rspList = NULL;
int32_t code = 0;
QW_LOCK(QW_READ, &mgmt->schLock);
......@@ -1365,14 +1394,16 @@ void qwProcessHbTimer(void *param, void *tmrId) {
int32_t schNum = taosHashGetSize(mgmt->schHash);
if (schNum <= 0) {
QW_UNLOCK(QW_READ, &mgmt->schLock);
return TSDB_CODE_SUCCESS;
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
return;
}
rspList = calloc(schNum, POINTER_BYTES);
rspList = calloc(schNum, sizeof(SQWHbInfo));
if (NULL == rspList) {
QW_UNLOCK(QW_READ, &mgmt->schLock);
QW_ELOG("calloc %d rsp pointer failed", schNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
QW_ELOG("calloc %d SQWHbInfo failed", schNum);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
return;
}
void *key = NULL;
......@@ -1396,13 +1427,13 @@ _return:
QW_UNLOCK(QW_READ, &mgmt->schLock);
for (int32_t j = 0; j < i; ++j) {
qwBuildAndSendStatusRsp(NULL, rspList[j]); //TODO SCHEDULER CONNECTION
tfree(rspList[j]);
qwBuildAndSendHbRsp(rspList[j].connection, &rspList[j].rsp, code);
tFreeSSchedulerHbRsp(&rspList[j].rsp);
}
tfree(rspList);
QW_RET(code);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
}
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj,
......@@ -1455,7 +1486,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
mgmt->hbTimer = taosTmrStart(qwProcessHbTimer, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer);
mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer);
if (NULL == mgmt->hbTimer) {
qError("start hb timer failed");
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1491,6 +1522,9 @@ void qWorkerDestroy(void **qWorkerMgmt) {
}
SQWorkerMgmt *mgmt = *qWorkerMgmt;
taosTmrStopA(&mgmt->hbTimer);
taosTmrCleanUp(mgmt->timer);
//TODO STOP ALL QUERY
......@@ -1500,10 +1534,11 @@ void qWorkerDestroy(void **qWorkerMgmt) {
}
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
/*
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
QW_ERR_RET(qwAcquireScheduler(QW_FPARAMS(), QW_READ, &sch));
QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
sch->lastAccessTs = taosGetTimestampSec();
......@@ -1541,7 +1576,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs
qwReleaseScheduler(QW_READ, mgmt);
(*rsp)->num = taskNum;
*/
return TSDB_CODE_SUCCESS;
}
......
......@@ -82,30 +82,18 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
int32_t size = 0;
if (sStatus) {
size = sizeof(SSchedulerStatusRsp) + sizeof(sStatus->status[0]) * sStatus->num;
} else {
size = sizeof(SSchedulerStatusRsp);
}
SSchedulerStatusRsp *pRsp = (SSchedulerStatusRsp *)rpcMallocCont(size);
if (sStatus) {
memcpy(pRsp, sStatus, size);
} else {
pRsp->num = 0;
}
int32_t qwBuildAndSendHbRsp(SRpcMsg *pMsg, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void *pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_TASKS_STATUS_RSP,
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = size,
.code = 0,
.contLen = contLen,
.code = code,
};
rpcSendResponse(&rpcRsp);
......@@ -243,7 +231,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
......@@ -294,12 +282,14 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId);
msg->phyLen = ntohl(msg->phyLen);
msg->sqlLen = ntohl(msg->sqlLen);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connection = pMsg};
......@@ -331,6 +321,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
......@@ -362,12 +353,13 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_SCH_TASK_DLOG("processReady start, node:%p", node);
QW_ERR_RET(qwProcessReady(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &qwMsg));
QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processReady end, node:%p", node);
......@@ -392,11 +384,11 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SSchedulerStatusRsp *sStatus = NULL;
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
//QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return:
QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
//QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
return TSDB_CODE_SUCCESS;
}
......@@ -421,6 +413,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
......@@ -450,9 +443,10 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId);
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
......@@ -480,10 +474,12 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
......@@ -496,6 +492,39 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SSchedulerHbReq req = {0};
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == pMsg->pCont) {
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
tFreeSSchedulerHbReq(&req);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_SCH_DLOG("processHb start, node:%p", node);
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));
QW_SCH_DLOG("processHb end, node:%p", node);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
......
......@@ -43,6 +43,12 @@ typedef struct SSchTrans {
void *transHandle;
} SSchTrans;
typedef struct SSchHbTrans {
SRWLatch lock;
uint64_t seqId;
SSchTrans trans;
} SSchHbTrans;
typedef struct SSchApiStat {
} SSchApiStat;
......@@ -63,18 +69,19 @@ typedef struct SSchedulerStat {
typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
SSchedulerCfg cfg;
int32_t jobRef;
SSchedulerStat stat;
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
SSchedulerCfg cfg;
int32_t jobRef;
SSchedulerStat stat;
SHashObj *hbConnections;
} SSchedulerMgmt;
typedef struct SSchCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
SEpSet epSet;
void *transport;
} SSchCallbackParam;
typedef struct SSchFlowControl {
......@@ -157,6 +164,10 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)
#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)
......@@ -183,11 +194,11 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
......
......@@ -104,7 +104,7 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType);
int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask);
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP:
......@@ -130,7 +130,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
atomic_store_32(&pTask->lastMsgType, -1);
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
}
......@@ -622,6 +622,47 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) {
int32_t code = 0;
SSchHbTrans *hb = NULL;
while (true) {
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), trans, sizeof(SSchHbTrans));
if (code) {
if (HASH_NODE_EXIST(code)) {
continue;
}
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(code);
}
return TSDB_CODE_SUCCESS;
}
break;
}
SCH_LOCK(SCH_WRITE, &hb->lock);
if (hb->seqId >= trans->seqId) {
qDebug("hb trans seqId is old, seqId:%" PRId64 ", currentId:%" PRId64 ", nodeId:%d, fqdn:%s, port:%d",
trans->seqId, hb->seqId, epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_UNLOCK(SCH_WRITE, &hb->lock);
return TSDB_CODE_SUCCESS;
}
hb->seqId = trans->seqId;
memcpy(&hb->trans, &trans->trans, sizeof(trans->trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
// if already FAILED, no more processing
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status));
......@@ -961,7 +1002,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
SSchTask *pTask = NULL;
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, pParam->refId);
SSchJob *pJob = schAcquireJob(pParam->refId);
if (NULL == pJob) {
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64, pParam->queryId, pParam->taskId, pParam->refId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
......@@ -988,7 +1029,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
_return:
if (pJob) {
taosReleaseRef(schMgmt.jobRef, pParam->refId);
schReleaseJob(pParam->refId);
}
tfree(param);
......@@ -1020,6 +1061,55 @@ int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
qDebug("QID:%"PRIx64",TID:%"PRIx64" drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
}
int32_t schHandleHbCallback(void* param, const SDataBuf* pMsg, int32_t code) {
if (code) {
qError("hb rsp error:%s", tstrerror(code));
SCH_ERR_RET(code);
}
SSchedulerHbRsp rsp = {0};
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
qError("invalid hb rsp msg, size:%d", pMsg->len);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (rsp.seqId != (uint64_t)-1) {
SSchHbTrans trans = {0};
trans.seqId = rsp.seqId;
trans.trans.transInst = pParam->transport;
trans.trans.transHandle = pMsg->handle;
SCH_RET(schUpdateHbConnection(&rsp.epId, &trans));
}
int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus);
for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i);
SSchJob *pJob = schAcquireJob(taskStatus->refId);
if (NULL == pJob) {
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId, taskStatus->queryId, taskStatus->taskId);
//TODO DROP TASK FROM SERVER!!!!
continue;
}
// TODO
schReleaseJob(taskStatus->refId);
}
_return:
tFreeSSchedulerHbRsp(&rsp);
SCH_RET(code);
}
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
......@@ -1040,6 +1130,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_DROP_TASK:
*fp = schHandleDropCallback;
break;
case TDMT_VND_QUERY_HEARTBEAT:
*fp = schHandleHbCallback;
break;
default:
qError("unknown msg type for callback, msgType:%d", msgType);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -1071,7 +1164,8 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet*
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = pTask->taskId;
param->taskId = SCH_TASK_ID(pTask);
param->transport = trans->transInst;
pMsgSendInfo->param = param;
......@@ -1138,6 +1232,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->refId = htobe64(pJob->refId);
pMsg->taskType = TASK_TYPE_TEMP;
pMsg->phyLen = htonl(pTask->msgLen);
pMsg->sqlLen = htonl(len);
......@@ -1196,6 +1291,30 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->refId = htobe64(pJob->refId);
break;
}
case TDMT_VND_QUERY_HEARTBEAT: {
SSchedulerHbReq req = {0};
req.sId = schMgmt.sId;
req.header.vgId = addr->nodeId;
req.epId.nodeId = addr->nodeId;
memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
if (msgSize < 0) {
SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
msg = calloc(1, msgSize);
if (NULL == msg) {
SCH_JOB_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
SCH_JOB_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
break;
}
default:
......@@ -1204,9 +1323,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
break;
}
atomic_store_32(&pTask->lastMsgType, msgType);
SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType);
SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle};
SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask ? pTask->handle : NULL};
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize));
if (isCandidateAddr) {
......@@ -1217,12 +1336,26 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
_return:
atomic_store_32(&pTask->lastMsgType, -1);
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
tfree(msg);
SCH_RET(code);
}
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_ERR_RET(schBuildAndSendMsg(pJob, NULL, addr, TDMT_VND_QUERY_HEARTBEAT));
}
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int8_t status = 0;
......@@ -1240,7 +1373,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) {
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen);
SCH_ERR_RET(code);
} else {
......@@ -1256,7 +1389,10 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
}
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
}
SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
return TSDB_CODE_SUCCESS;
......@@ -1459,7 +1595,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD
pJob->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(pJob));
taosAcquireRef(schMgmt.jobRef, pJob->refId);
schAcquireJob(pJob->refId);
*job = pJob->refId;
......@@ -1470,7 +1606,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD
SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob));
taosReleaseRef(schMgmt.jobRef, pJob->refId);
schReleaseJob(pJob->refId);
return TSDB_CODE_SUCCESS;
......@@ -1493,6 +1629,9 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.cfg.maxJobNum == 0) {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
}
if (schMgmt.cfg.maxNodeTableNum <= 0) {
schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
}
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM;
schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM;
......@@ -1500,7 +1639,13 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
if (schMgmt.jobRef < 0) {
qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum);
qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
schMgmt.hbConnections = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.hbConnections) {
qError("taosHashInit hb connections failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1521,10 +1666,10 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
SSchJob *job = taosAcquireRef(schMgmt.jobRef, *pJob);
SSchJob *job = schAcquireJob(*pJob);
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
taosReleaseRef(schMgmt.jobRef, *pJob);
schReleaseJob(*pJob);
return TSDB_CODE_SUCCESS;
}
......@@ -1573,7 +1718,7 @@ int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
tInfo.addr = plan->execNode;
code = qSubPlanToString(plan, &msg, &msgLen);
if (TSDB_CODE_SUCCESS != code || NULL == msg || msgLen <= 0) {
if (TSDB_CODE_SUCCESS != code) {
qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
SCH_ERR_JRET(code);
}
......@@ -1667,7 +1812,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
}
int32_t code = 0;
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job);
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
......@@ -1676,19 +1821,19 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%d", status);
taosReleaseRef(schMgmt.jobRef, job);
schReleaseJob(job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob));
taosReleaseRef(schMgmt.jobRef, job);
schReleaseJob(job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
taosReleaseRef(schMgmt.jobRef, job);
schReleaseJob(job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -1741,13 +1886,13 @@ _return:
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
taosReleaseRef(schMgmt.jobRef, job);
schReleaseJob(job);
SCH_RET(code);
}
int32_t scheduleCancelJob(int64_t job) {
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job);
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
......@@ -1755,13 +1900,13 @@ int32_t scheduleCancelJob(int64_t job) {
int32_t code = schCancelJob(pJob);
taosReleaseRef(schMgmt.jobRef, job);
schReleaseJob(job);
SCH_RET(code);
}
void schedulerFreeJob(int64_t job) {
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job);
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
return;
......@@ -1776,6 +1921,8 @@ void schedulerFreeJob(int64_t job) {
if (taosRemoveRef(schMgmt.jobRef, job)) {
SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
}
schReleaseJob(job);
}
void schedulerFreeTaskList(SArray *taskList) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册