提交 82922152 编写于 作者: D dapan

feature/qnode

上级 51ef795d
...@@ -42,6 +42,8 @@ enum { ...@@ -42,6 +42,8 @@ enum {
QW_EVENT_READY, QW_EVENT_READY,
QW_EVENT_FETCH, QW_EVENT_FETCH,
QW_EVENT_DROP, QW_EVENT_DROP,
QW_EVENT_SCH_SINK,
QW_EVENT_SCH_QUERY,
QW_EVENT_MAX, QW_EVENT_MAX,
}; };
...@@ -100,7 +102,7 @@ typedef struct SQWTaskCtx { ...@@ -100,7 +102,7 @@ typedef struct SQWTaskCtx {
SRWLatch lock; SRWLatch lock;
int32_t phase; int32_t phase;
int8_t sinkInQ; int32_t sinkId;
int8_t queryInQ; int8_t queryInQ;
int8_t events[QW_EVENT_MAX]; int8_t events[QW_EVENT_MAX];
......
...@@ -450,13 +450,51 @@ _return: ...@@ -450,13 +450,51 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->sinkScheduled)) {
qDebug("data sink already scheduled");
return TSDB_CODE_SUCCESS;
}
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg) { int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
int32_t len = 0; int32_t len = 0;
SRetrieveTableRsp *rsp = NULL; SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false; bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
SOutputData output = {0};
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
...@@ -467,7 +505,7 @@ int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ ...@@ -467,7 +505,7 @@ int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
if (len == 0) { if (len == 0) {
if (queryEnd) { if (queryEnd) {
code = dsGetDataBlock(ctx->sinkHandle, &output); code = dsGetDataBlock(ctx->sinkHandle, pOutput);
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code); QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code);
QW_ERR_RET(code); QW_ERR_RET(code);
...@@ -479,10 +517,10 @@ int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ ...@@ -479,10 +517,10 @@ int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
qwBuildFetchRsp(rsp, &output, 0);
*rspMsg = rsp; *rspMsg = rsp;
*dataLen = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -494,42 +532,33 @@ int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ ...@@ -494,42 +532,33 @@ int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
// Got data from sink // Got data from sink
*dataLen = len;
// Note: schedule data sink firstly and will schedule query after it's done
if (output.needSchedule) {
QW_TASK_DLOG("sink need schedule, queryEnd:%d", output.queryEnd);
QW_ERR_RET(qwScheduleDataSink(handles, mgmt, sId, qId, tId, pMsg));
} else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus);
QW_ERR_RET(qwScheduleQuery(handles, mgmt, sId, qId, tId, pMsg));
}
QW_TASK_DLOG("task got data in sink, dataLength:%d", len); QW_TASK_DLOG("task got data in sink, dataLength:%d", len);
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp;
output.pData = rsp->data; pOutput->pData = rsp->data;
code = dsGetDataBlock(ctx->sinkHandle, &output); code = dsGetDataBlock(ctx->sinkHandle, pOutput);
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code); QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code);
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
QW_ERR_RET(code); QW_ERR_RET(code);
} }
queryEnd = output.queryEnd; queryEnd = pOutput->queryEnd;
output.queryEnd = false; pOutput->queryEnd = false;
if (DS_BUF_EMPTY == output.bufStatus && queryEnd) { if (DS_BUF_EMPTY == pOutput->bufStatus && queryEnd) {
output.queryEnd = true; pOutput->queryEnd = true;
QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED); QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
} }
qwBuildFetchRsp(rsp, &output, len);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -783,13 +812,29 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t ...@@ -783,13 +812,29 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
} }
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true; locked = true;
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp)); // Note: schedule data sink firstly and will schedule query after it's done
if (sOutput.needSchedule) {
QW_TASK_DLOG("sink need schedule, queryEnd:%d", sOutput.queryEnd);
if (sOutput.needSchedule > ctx.sinkId) {
QW_ERR_RET(qwScheduleDataSink(ctx, mgmt, sId, qId, tId, pMsg));
}
} else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus);
QW_ERR_RET(qwScheduleQuery(ctx, mgmt, sId, qId, tId, pMsg));
}
if (rsp) {
qwBuildFetchRsp(rsp, &sOutput, dataLen);
}
_return: _return:
if (locked) { if (locked) {
...@@ -800,8 +845,12 @@ _return: ...@@ -800,8 +845,12 @@ _return:
qwReleaseTaskCtx(QW_READ, mgmt); qwReleaseTaskCtx(QW_READ, mgmt);
} }
if (needRsp) { if (code) {
qwBuildAndSendFetchRsp(pMsg, rsp, dataLen, code); qwFreeFetchRsp(rsp);
rsp = NULL;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code);
} else if (rsp) {
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
} }
QW_RET(code); QW_RET(code);
...@@ -910,44 +959,6 @@ void qWorkerDestroy(void **qWorkerMgmt) { ...@@ -910,44 +959,6 @@ void qWorkerDestroy(void **qWorkerMgmt) {
int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->sinkScheduled)) {
qDebug("data sink already scheduled");
return TSDB_CODE_SUCCESS;
}
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = queryId;
req->taskId = taskId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SSchedulerStatusRsp **rsp) { int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SSchedulerStatusRsp **rsp) {
......
...@@ -36,7 +36,9 @@ void qwBuildFetchRsp(SRetrieveTableRsp *rsp, SOutputData *input, int32_t len) { ...@@ -36,7 +36,9 @@ void qwBuildFetchRsp(SRetrieveTableRsp *rsp, SOutputData *input, int32_t len) {
void qwFreeFetchRsp(void *msg) { void qwFreeFetchRsp(void *msg) {
rpcFreeCont(msg); if (msg) {
rpcFreeCont(msg);
}
} }
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
...@@ -106,7 +108,9 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { ...@@ -106,7 +108,9 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
if (NULL == pRsp) { if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp));
...@@ -264,44 +268,6 @@ _return: ...@@ -264,44 +268,6 @@ _return:
} }
int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->sinkScheduled)) {
qDebug("data sink already scheduled");
return TSDB_CODE_SUCCESS;
}
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = queryId;
req->taskId = taskId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) { int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->queryScheduled)) { if (atomic_load_8(&handles->queryScheduled)) {
QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled); QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册