提交 3af9da8f 编写于 作者: D dapan1121

feature/qnode

上级 c3102ecc
......@@ -83,7 +83,7 @@ typedef struct SQWMsg {
} SQWMsg;
typedef struct SQWPhaseInput {
int8_t status;
int8_t taskStatus;
int8_t taskType;
int32_t code;
qTaskInfo_t taskHandle;
......
......@@ -240,14 +240,14 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) {
}
int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &mgmt->ctxLock);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
//QW_LOCK(rwType, &mgmt->ctxLock);
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
QW_UNLOCK(rwType, &mgmt->ctxLock);
//QW_UNLOCK(rwType, &mgmt->ctxLock);
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
}
......@@ -268,19 +268,19 @@ int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, int32_t status, SQWTaskCtx **ctx) {
int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool acquire, int32_t status, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskCtx nctx = {0};
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
//QW_LOCK(QW_WRITE, &mgmt->ctxLock);
int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
if (0 != code) {
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
//QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (HASH_NODE_EXIST(code)) {
if (rwType && ctx) {
if (acquire && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx));
} else if (ctx) {
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
......@@ -293,9 +293,9 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
//QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (rwType && ctx) {
if (acquire && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx));
} else if (ctx) {
QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
......@@ -305,22 +305,23 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
}
int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, NULL));
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, NULL));
}
int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), rwType, 0, ctx);
int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), true, 0, ctx);
}
int32_t qwAddGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, ctx);
return qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, ctx);
}
void qwReleaseTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->ctxLock);
void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) {
//QW_UNLOCK(rwType, &mgmt->ctxLock);
taosHashRelease(mgmt->ctxHash, ctx);
}
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
......@@ -355,7 +356,7 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
// Note: NEED CTX HASH LOCKED BEFORE ENTRANCE
int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskCtx octx;
......@@ -367,6 +368,13 @@ int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
octx = *ctx;
atomic_store_ptr(&ctx->taskHandle, NULL);
atomic_store_ptr(&ctx->sinkHandle, NULL);
if (rwType) {
QW_UNLOCK(rwType, &ctx->lock);
}
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
......@@ -439,7 +447,7 @@ _return:
QW_RET(code);
}
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType) {
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType, bool execOnce) {
int32_t code = 0;
bool qcontinue = true;
SSDataBlock* pRes = NULL;
......@@ -463,6 +471,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH
if (TASK_TYPE_TEMP == taskType) {
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
}
break;
}
......@@ -475,7 +484,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
if (!qcontinue) {
if (execOnce || (!qcontinue)) {
break;
}
}
......@@ -573,7 +582,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
int8_t status = 0;
SQWTaskCtx *ctx = NULL;
bool locked = false;
bool ctxAcquired = false;
void *readyConnection = NULL;
void *dropConnection = NULL;
void *cancelConnection = NULL;
......@@ -582,7 +590,9 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
switch (phase) {
case QW_PHASE_PRE_QUERY: {
QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
atomic_store_32(&ctx->phase, phase);
atomic_store_8(&ctx->taskType, input->taskType);
......@@ -597,7 +607,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
......@@ -621,7 +631,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
break;
}
case QW_PHASE_POST_QUERY: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -641,8 +651,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
output->needStop = true;
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
dropConnection = ctx->dropConnection;
......@@ -665,13 +675,12 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
}
if (!output->needStop) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->status));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->taskStatus));
}
break;
}
case QW_PHASE_PRE_FETCH: {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
ctxAcquired = true;
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -686,6 +695,13 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase);
output->needStop = true;
......@@ -721,7 +737,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
break;
}
case QW_PHASE_POST_FETCH: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -743,7 +759,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
output->needStop = true;
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
dropConnection = ctx->dropConnection;
......@@ -772,7 +788,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
}
case QW_PHASE_PRE_CQUERY: {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
ctxAcquired = true;
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -808,7 +823,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
break;
}
case QW_PHASE_POST_CQUERY: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -829,8 +844,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
QW_TASK_WLOG("start to drop task, phase:%d", phase);
output->needStop = true;
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
dropConnection = ctx->dropConnection;
......@@ -871,8 +886,8 @@ _return:
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
if (ctxAcquired && ctx) {
qwReleaseTaskCtx(QW_READ, mgmt);
if (ctx) {
qwReleaseTaskCtx(mgmt, ctx);
}
if (readyConnection) {
......@@ -896,7 +911,7 @@ _return:
}
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg, int8_t taskType) {
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
int32_t code = 0;
bool queryRsped = false;
bool needStop = false;
......@@ -944,7 +959,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
queryRsped = true;
if (pTaskInfo && sinkHandle) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &pTaskInfo, sinkHandle, taskType));
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &pTaskInfo, sinkHandle, taskType, true));
}
_return:
......@@ -954,14 +969,8 @@ _return:
}
if (!queryRsped) {
code = qwBuildAndSendQueryRsp(qwMsg->connection, rspCode);
if (TSDB_CODE_SUCCESS == code) {
QW_TASK_DLOG("query msg rsped, code:%d", rspCode);
}
if (TSDB_CODE_SUCCESS == rspCode && code) {
rspCode = code;
}
qwBuildAndSendQueryRsp(qwMsg->connection, rspCode);
QW_TASK_DLOG("query msg rsped, code:%x", rspCode);
}
if (needStop) {
......@@ -971,12 +980,7 @@ _return:
input.code = rspCode;
input.taskHandle = pTaskInfo;
input.sinkHandle = sinkHandle;
if (TSDB_CODE_SUCCESS != rspCode) {
input.status = JOB_TASK_STATUS_FAILED;
} else {
input.status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED;
QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
......@@ -989,8 +993,8 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
int8_t phase = 0;
bool needRsp = false;
int32_t rspCode = 0;
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -1020,6 +1024,7 @@ _return:
if (ctx) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx);
}
if (needRsp) {
......@@ -1059,7 +1064,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
DataSinkHandle sinkHandle = ctx->sinkHandle;
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx->taskHandle, sinkHandle, ctx->taskType));
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx->taskHandle, sinkHandle, ctx->taskType, QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0};
......@@ -1191,7 +1196,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
SQWTaskCtx *ctx = NULL;
bool locked = false;
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_WRITE, &ctx));
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -1210,7 +1215,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
ctx->dropConnection = qwMsg->connection;
} else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
locked = false;
needRsp = true;
......@@ -1226,12 +1231,8 @@ _return:
QW_SET_RSP_CODE(ctx, code);
}
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
if (ctx) {
qwReleaseTaskCtx(QW_WRITE, mgmt);
qwReleaseTaskCtx(mgmt, ctx);
}
if (TSDB_CODE_SUCCESS != code || needRsp) {
......
......@@ -273,7 +273,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen);
QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -307,7 +307,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid cquery msg, contLen:%d", pMsg->contLen);
QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -337,7 +337,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
SResReadyReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -398,6 +398,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册