提交 51ef795d 编写于 作者: D dapan1121

feature/qnode

上级 b1980e8f
...@@ -27,14 +27,29 @@ extern "C" { ...@@ -27,14 +27,29 @@ extern "C" {
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000 #define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
enum { enum {
QW_READY_NOT_RECEIVED = 0, QW_PHASE_PRE_QUERY = 1,
QW_READY_RECEIVED, QW_PHASE_POST_QUERY,
QW_READY_RESPONSED, QW_PHASE_PRE_CQUERY,
QW_PHASE_POST_CQUERY,
QW_PHASE_PRE_SINK,
QW_PHASE_POST_SINK,
QW_PHASE_PRE_FETCH,
QW_PHASE_POST_FETCH,
}; };
enum { enum {
QW_TASK_INFO_STATUS = 1, QW_EVENT_CANCEL = 1,
QW_TASK_INFO_READY, QW_EVENT_READY,
QW_EVENT_FETCH,
QW_EVENT_DROP,
QW_EVENT_MAX,
};
enum {
QW_EVENT_NOT_RECEIVED = 0,
QW_EVENT_RECEIVED,
QW_EVENT_PROCESSED,
}; };
enum { enum {
...@@ -57,21 +72,43 @@ enum { ...@@ -57,21 +72,43 @@ enum {
QW_ADD_ACQUIRE, QW_ADD_ACQUIRE,
}; };
typedef struct SQWMsg {
void *node;
char *msg;
int32_t msgLen;
void *connection;
} SQWMsg;
typedef struct SQWPhaseInput {
int8_t status;
int32_t code;
} SQWPhaseInput;
typedef struct SQWPhaseOutput {
int32_t rspCode;
bool needStop;
bool needRsp;
} SQWPhaseOutput;
typedef struct SQWTaskStatus { typedef struct SQWTaskStatus {
SRWLatch lock;
int32_t code; int32_t code;
int8_t status; int8_t status;
int8_t ready;
bool cancel;
bool drop;
} SQWTaskStatus; } SQWTaskStatus;
typedef struct SQWTaskCtx { typedef struct SQWTaskCtx {
SRWLatch lock; SRWLatch lock;
int8_t sinkScheduled; int32_t phase;
int8_t queryScheduled;
int8_t sinkInQ;
int8_t queryInQ;
int8_t events[QW_EVENT_MAX];
int8_t ready;
int8_t cancel;
int8_t drop;
int8_t needRsp;
bool needRsp;
qTaskInfo_t taskHandle; qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle; DataSinkHandle sinkHandle;
} SQWTaskCtx; } SQWTaskCtx;
...@@ -95,6 +132,17 @@ typedef struct SQWorkerMgmt { ...@@ -95,6 +132,17 @@ typedef struct SQWorkerMgmt {
putReqToQueryQFp putToQueueFp; putReqToQueryQFp putToQueueFp;
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId
#define QW_IDS() sId, qId, tId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_IS_EVENT_RECEIVED(ctx, event) ((ctx)->events[event] == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) ((ctx)->events[event] == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) ((ctx)->events[event] = QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) ((ctx)->events[event] = QW_EVENT_PROCESSED)
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
#define QW_GOT_RES_DATA(data) (true) #define QW_GOT_RES_DATA(data) (true)
#define QW_LOW_RES_DATA(data) (false) #define QW_LOW_RES_DATA(data) (false)
...@@ -103,7 +151,6 @@ typedef struct SQWorkerMgmt { ...@@ -103,7 +151,6 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED) #define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0) #define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
#define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0) #define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
#define QW_IDS() sId, qId, tId
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...@@ -160,10 +207,6 @@ typedef struct SQWorkerMgmt { ...@@ -160,10 +207,6 @@ typedef struct SQWorkerMgmt {
int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
#include "tname.h" #include "tname.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
int32_t qwValidateStatus(SQWorkerMgmt *mgmt, int8_t oriStatus, int8_t newStatus, uint64_t sId, uint64_t qId, uint64_t tId) { int32_t qwValidateStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t oriStatus, int8_t newStatus) {
int32_t code = 0; int32_t code = 0;
if (oriStatus == newStatus) { if (oriStatus == newStatus) {
...@@ -74,105 +74,85 @@ _return: ...@@ -74,105 +74,85 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwUpdateTaskInfo(SQWorkerMgmt *mgmt, SQWTaskStatus *task, int8_t type, void *data, uint64_t sId, uint64_t qId, uint64_t tId) { int32_t qwSetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskStatus *task, int8_t status) {
int32_t code = 0; int32_t code = 0;
int8_t origStatus = 0; int8_t origStatus = 0;
switch (type) { while (true) {
case QW_TASK_INFO_STATUS: { origStatus = atomic_load_8(&task->status);
int8_t newStatus = *(int8_t *)data;
QW_ERR_RET(qwValidateStatus(mgmt, task->status, newStatus, QW_IDS())); QW_ERR_RET(qwValidateStatus(QW_FPARAMS(), origStatus, status));
origStatus = task->status; if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
task->status = newStatus; continue;
QW_TASK_DLOG("task status updated from %d to %d", origStatus, newStatus);
break;
} }
default:
QW_TASK_ELOG("unknown task info, type:%d", type); QW_TASK_DLOG("task status updated from %d to %d", origStatus, status);
return TSDB_CODE_QRY_APP_ERROR;
break;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskCtx resCache = {0};
resCache.taskHandle = taskHandle;
resCache.sinkHandle = sinkHandle;
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
if (0 != taosHashPut(mgmt->ctxHash, id, sizeof(id), &resCache, sizeof(SQWTaskCtx))) {
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
QW_TASK_ELOG("taosHashPut task ctx to ctxHash failed, taskHandle:%p, sinkHandle:%p", taskHandle, sinkHandle);
return TSDB_CODE_QRY_APP_ERROR;
}
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_SUCCESS;
}
int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
SQWSchStatus newSch = {0}; SQWSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) { if (NULL == newSch.tasksHash) {
QW_SCH_DLOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY; QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
while (true) { QW_LOCK(QW_WRITE, &mgmt->schLock);
QW_LOCK(QW_WRITE, &mgmt->schLock); int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch)); if (0 != code) {
if (0 != code) { if (!HASH_NODE_EXIST(code)) {
if (!HASH_NODE_EXIST(code)) { QW_UNLOCK(QW_WRITE, &mgmt->schLock);
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
taosHashCleanup(newSch.tasksHash);
return TSDB_CODE_QRY_APP_ERROR;
}
}
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch)) {
if (code) {
taosHashCleanup(newSch.tasksHash);
}
return TSDB_CODE_SUCCESS; QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
taosHashCleanup(newSch.tasksHash);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
taosHashCleanup(newSch.tasksHash);
} }
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) { int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
QW_LOCK(rwType, &mgmt->schLock); while (true) {
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); QW_LOCK(rwType, &mgmt->schLock);
if (NULL == (*sch)) { *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
QW_UNLOCK(rwType, &mgmt->schLock); if (NULL == (*sch)) {
QW_UNLOCK(rwType, &mgmt->schLock);
if (QW_NOT_EXIST_ADD == nOpt) {
return qwAddScheduler(rwType, mgmt, sId, sch); if (QW_NOT_EXIST_ADD == nOpt) {
} else if (QW_NOT_EXIST_RET_ERR == nOpt) { QW_ERR_RET(qwAddSchedulerImpl(rwType, mgmt, sId, sch));
return TSDB_CODE_QRY_SCH_NOT_EXIST;
} else { nOpt = QW_NOT_EXIST_RET_ERR;
assert(0);
continue;
} else if (QW_NOT_EXIST_RET_ERR == nOpt) {
QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
} else {
assert(0);
}
} }
break;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD); return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD);
} }
int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR); return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR);
} }
...@@ -180,7 +160,7 @@ void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { ...@@ -180,7 +160,7 @@ void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->schLock); QW_UNLOCK(rwType, &mgmt->schLock);
} }
int32_t qwAddTaskImpl(SQWorkerMgmt *mgmt, SQWSchStatus *sch, int32_t rwType, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWTaskStatus **task) { int32_t qwAddTaskStatusImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
int32_t code = 0; int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
...@@ -194,43 +174,42 @@ int32_t qwAddTaskImpl(SQWorkerMgmt *mgmt, SQWSchStatus *sch, int32_t rwType, uin ...@@ -194,43 +174,42 @@ int32_t qwAddTaskImpl(SQWorkerMgmt *mgmt, SQWSchStatus *sch, int32_t rwType, uin
if (0 != code) { if (0 != code) {
QW_UNLOCK(QW_WRITE, &sch->tasksLock); QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (HASH_NODE_EXIST(code)) { if (HASH_NODE_EXIST(code)) {
if (QW_EXIST_ACQUIRE == eOpt && rwType && task) { if (rwType && task) {
QW_ERR_RET(qwAcquireTask(mgmt, rwType, sch, qId, tId, task)); QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
} else if (QW_EXIST_RET_ERR == eOpt) {
return TSDB_CODE_QRY_TASK_ALREADY_EXIST;
} else { } else {
assert(0); QW_TASK_ELOG("task status already exist, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
} }
} else { } else {
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId); QW_TASK_ELOG("taosHashPut to tasksHash failed, code:%x", code);
return TSDB_CODE_QRY_APP_ERROR; QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
QW_UNLOCK(QW_WRITE, &sch->tasksLock); QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (QW_EXIST_ACQUIRE == eOpt && rwType && task) { if (rwType && task) {
QW_ERR_RET(qwAcquireTask(mgmt, rwType, sch, qId, tId, task)); QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status) { int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status) {
SQWSchStatus *tsch = NULL; SQWSchStatus *tsch = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &tsch)); QW_ERR_RET(qwAcquireAddScheduler(QW_FPARAMS(), QW_READ, &tsch));
QW_ERR_JRET(qwAddTaskImpl(mgmt, tsch, 0, qId, tId, status, QW_EXIST_RET_ERR, NULL)); QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
_return: _return:
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
QW_RET(code);
} }
int32_t qwAcquireTaskImpl(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int32_t status, int32_t nOpt, SQWTaskStatus **task) { int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId); QW_SET_QTID(id, qId, tId);
...@@ -238,936 +217,610 @@ int32_t qwAcquireTaskImpl(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, ...@@ -238,936 +217,610 @@ int32_t qwAcquireTaskImpl(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch,
*task = taosHashGet(sch->tasksHash, id, sizeof(id)); *task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) { if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock); QW_UNLOCK(rwType, &sch->tasksLock);
QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
if (QW_NOT_EXIST_ADD == nOpt) {
QW_ERR_RET(qwAddTaskImpl(mgmt, sch, rwType, qId, tId, status, QW_EXIST_ACQUIRE, task));
} else if (QW_NOT_EXIST_RET_ERR == nOpt) {
return TSDB_CODE_QRY_TASK_NOT_EXIST;
} else {
assert(0);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) {
return qwAcquireTaskImpl(mgmt, rwType, sch, qId, tId, 0, QW_NOT_EXIST_RET_ERR, task);
}
int32_t qwAcquireAddTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int32_t status, SQWTaskStatus **task) { int32_t qwAddAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) {
return qwAcquireTaskImpl(mgmt, rwType, sch, qId, tId, status, QW_NOT_EXIST_ADD, task); return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
} }
void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) { void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) {
QW_UNLOCK(rwType, &sch->tasksLock); QW_UNLOCK(rwType, &sch->tasksLock);
} }
int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, int32_t status, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskCtx ctx = {0};
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &ctx, sizeof(SQWTaskCtx));
if (0 != code) {
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (HASH_NODE_EXIST(code)) {
if (rwType && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx));
} else {
QW_TASK_ELOG("task ctx already exist, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
}
} else {
QW_TASK_ELOG("taosHashPut to ctxHash failed, code:%x", code);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (rwType && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx));
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, NULL));
}
int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWTaskCtx **handles) {
char id[sizeof(queryId) + sizeof(taskId)] = {0}; int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
QW_SET_QTID(id, queryId, taskId); char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &mgmt->ctxLock); QW_LOCK(rwType, &mgmt->ctxLock);
*handles = taosHashGet(mgmt->ctxHash, id, sizeof(id)); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*handles)) { if (NULL == (*ctx)) {
QW_UNLOCK(rwType, &mgmt->ctxLock); QW_UNLOCK(rwType, &mgmt->ctxLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST; QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) { int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), rwType, 0, ctx);
}
void qwReleaseTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->ctxLock); QW_UNLOCK(rwType, &mgmt->ctxLock);
} }
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) { void qwFreeTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx) {
SQWSchStatus *sch = NULL; if (ctx->taskHandle) {
int32_t taskNum = 0; qDestroyTask(ctx->taskHandle);
ctx->taskHandle = NULL;
}
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); // TODO
if (ctx->sinkHandle) {
sch->lastAccessTs = taosGetTimestampSec();
QW_LOCK(QW_READ, &sch->tasksLock);
taskNum = taosHashGetSize(sch->tasksHash);
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
*rsp = calloc(1, size);
if (NULL == *rsp) {
qError("calloc %d failed", size);
QW_UNLOCK(QW_READ, &sch->tasksLock);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
}
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0;
void *pIter = taosHashIterate(sch->tasksHash, NULL); // Note: NEED CTX HASH LOCKED BEFORE ENTRANCE
while (pIter) { int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter; char id[sizeof(qId) + sizeof(tId)] = {0};
taosHashGetKey(pIter, &key, &keyLen); QW_SET_QTID(id, qId, tId);
SQWTaskCtx octx;
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
(*rsp)->status[i].status = taskStatus->status; if (NULL == ctx) {
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
pIter = taosHashIterate(sch->tasksHash, pIter); }
}
QW_UNLOCK(QW_READ, &sch->tasksLock); octx = *ctx;
qwReleaseScheduler(QW_READ, mgmt);
(*rsp)->num = taskNum; if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove from ctx hash failed, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
}
if (octx.taskHandle) {
qDestroyTask(octx.taskHandle);
}
if (octx.sinkHandle) {
dsDestroyDataSinker(octx.sinkHandle);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwDropTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); if (qwAcquireScheduler(QW_FPARAMS(), QW_WRITE, &sch)) {
QW_TASK_WLOG("scheduler does not exist, id:%s", id);
return TSDB_CODE_SUCCESS;
}
sch->lastAccessTs = taosGetTimestampSec(); if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
qwReleaseScheduler(QW_WRITE, mgmt);
QW_TASK_WLOG("task does not exist, id:%s", id);
return TSDB_CODE_SUCCESS;
}
qwReleaseScheduler(QW_READ, mgmt); if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove task from hash failed, task:%p", task);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_TASK_DLOG("task dropped, id:%d", id);
_return:
qwReleaseTaskStatus(QW_WRITE, sch);
qwReleaseScheduler(QW_WRITE, mgmt);
QW_RET(code);
}
int32_t qwUpdateTaskCtxHandles(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) {
SQWTaskCtx *ctx = NULL;
QW_ERR_RET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
ctx->taskHandle = taskHandle;
ctx->sinkHandle = sinkHandle;
qwReleaseTaskCtx(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) { int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTaskStatus(mgmt, QW_READ, sch, qId, tId, &task));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)); QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
QW_UNLOCK(QW_WRITE, &task->lock);
_return: _return:
qwReleaseTask(QW_READ, sch); qwReleaseTaskStatus(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code); QW_RET(code);
} }
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) { int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needRsp) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
SQWTaskCtx *ctx = NULL;
bool locked = false;
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) { QW_LOCK(QW_WRITE, &ctx->lock);
*taskStatus = JOB_TASK_STATUS_NULL;
return TSDB_CODE_SUCCESS; locked = true;
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropping", NULL);
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
} }
if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) { if (ctx->taskHandle && QW_IN_EXECUTOR(ctx)) {
qwReleaseScheduler(QW_READ, mgmt); QW_ERR_JRET(qKillTask(ctx->taskHandle));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
*taskStatus = JOB_TASK_STATUS_NULL; } else if (ctx->phase > 0) {
return TSDB_CODE_SUCCESS; QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
locked = false;
*needRsp = true;
} }
*taskStatus = task->status; if (locked) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
qwReleaseTask(QW_READ, sch); _return:
qwReleaseScheduler(QW_READ, mgmt);
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
if (ctx) {
qwReleaseTaskCtx(QW_READ, mgmt);
}
QW_RET(code); QW_RET(code);
} }
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg) {
SQWSchStatus *sch = NULL; int32_t len = 0;
SQWTaskStatus *task = NULL; SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
SOutputData output = {0};
QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch)); dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task)); if (len < 0) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (len == 0) {
if (queryEnd) {
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code);
QW_ERR_RET(code);
}
QW_TASK_DLOG("no data in sink and query end", NULL);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
QW_LOCK(QW_WRITE, &task->lock); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
task->cancel = true; qwBuildFetchRsp(rsp, &output, 0);
int8_t oriStatus = task->status;
int8_t newStatus = 0;
if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); *rspMsg = rsp;
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { }
newStatus = JOB_TASK_STATUS_CANCELLED;
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()));
} else {
newStatus = JOB_TASK_STATUS_CANCELLING;
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()));
}
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
if (oriStatus == JOB_TASK_STATUS_EXECUTING) { // Got data from sink
//TODO call executer to cancel subquery async
// Note: schedule data sink firstly and will schedule query after it's done
if (output.needSchedule) {
QW_TASK_DLOG("sink need schedule, queryEnd:%d", output.queryEnd);
QW_ERR_RET(qwScheduleDataSink(handles, mgmt, sId, qId, tId, pMsg));
} else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus);
QW_ERR_RET(qwScheduleQuery(handles, mgmt, sId, qId, tId, pMsg));
} }
QW_TASK_DLOG("task got data in sink, dataLength:%d", len);
return TSDB_CODE_SUCCESS; QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
output.pData = rsp->data;
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code);
qwFreeFetchRsp(rsp);
QW_ERR_RET(code);
}
_return: queryEnd = output.queryEnd;
output.queryEnd = false;
if (task) { if (DS_BUF_EMPTY == output.bufStatus && queryEnd) {
QW_UNLOCK(QW_WRITE, &task->lock); output.queryEnd = true;
qwReleaseTask(QW_READ, sch); QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
} }
if (sch) { qwBuildFetchRsp(rsp, &output, len);
qwReleaseScheduler(QW_READ, mgmt);
}
QW_RET(code); return TSDB_CODE_SUCCESS;
} }
// caller should make sure task is not running int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0}; int8_t status = 0;
QW_SET_QTID(id, qId, tId); SQWTaskCtx *ctx = NULL;
bool locked = false;
QW_LOCK(QW_WRITE, &mgmt->ctxLock); switch (phase) {
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); case QW_PHASE_PRE_QUERY: {
if (NULL == ctx) { QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST; QW_LOCK(QW_WRITE, &ctx->lock);
}
locked = true;
if (ctx->taskHandle) {
qDestroyTask(ctx->taskHandle); assert(!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL));
ctx->taskHandle = NULL;
} if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
output->needStop = true;
if (ctx->sinkHandle) {
dsDestroyDataSinker(ctx->sinkHandle); QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
ctx->sinkHandle = NULL; QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
}
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove from ctx hash failed, id:%s", id); // Note: ctx freed, no need to unlock it
locked = false;
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST; output->needStop = true;
}
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); qwFreeTask(QW_FPARAMS(), ctx);
return TSDB_CODE_SUCCESS; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
}
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
}
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
qwDropTaskCtx(mgmt, sId, qId, tId);
if (qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch)) {
QW_TASK_WLOG("scheduler does not exist, sch:%p", sch);
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(mgmt, QW_WRITE, sch, qId, tId, &task)) {
qwReleaseScheduler(QW_WRITE, mgmt);
QW_TASK_WLOG("task does not exist, task:%p", task);
return TSDB_CODE_SUCCESS;
}
QW_TASK_DLOG("drop task, status:%d, code:%x, ready:%d, cancel:%d, drop:%d", task->status, task->code, task->ready, task->cancel, task->drop);
if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove task from hash failed, task:%p", task);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
_return:
qwReleaseTask(QW_WRITE, sch);
qwReleaseScheduler(QW_WRITE, mgmt);
QW_RET(code);
}
int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { if (!output->needStop) {
SQWSchStatus *sch = NULL; QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
SQWTaskStatus *task = NULL; }
int32_t code = 0; break;
}
case QW_PHASE_POST_QUERY: {
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
assert(!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
output->needStop = true;
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
// Note: ctx freed, no need to unlock it
locked = false;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
output->needStop = true;
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
qwFreeTask(QW_FPARAMS(), ctx);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
output->needRsp = true;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
output->rspCode = input.code;
}
QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch)); if (!output->needStop) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input.status));
}
break;
}
case QW_PHASE_PRE_FETCH: {
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task)); if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled", NULL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
QW_LOCK(QW_WRITE, &task->lock); if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task is dropping", NULL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPING;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task is cancelling", NULL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING;
}
task->drop = true; if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_TASK_WLOG("last fetch not finished", NULL);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_DUPLICATTED_OPERATION;
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
}
int8_t oriStatus = task->status; if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
int8_t newStatus = 0; QW_TASK_ELOG("query rsp are not ready", NULL);
output->needStop = true;
if (task->status == JOB_TASK_STATUS_EXECUTING) { output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR;
newStatus = JOB_TASK_STATUS_DROPPING; QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS())); }
} else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) { break;
QW_UNLOCK(QW_WRITE, &task->lock); }
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
} else {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(qwDropTask(mgmt, sId, qId, tId));
return TSDB_CODE_SUCCESS;
} }
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
//TODO call executer to cancel subquery async
}
return TSDB_CODE_SUCCESS;
_return: _return:
if (task) { if (locked) {
QW_UNLOCK(QW_WRITE, &task->lock); ctx->phase = phase;
qwReleaseTask(QW_READ, sch); QW_UNLOCK(QW_WRITE, &ctx->lock);
} }
if (sch) { if (ctx) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseTaskCtx(QW_READ, mgmt);
} }
QW_RET(code); QW_RET(code);
} }
int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) {
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
SRpcMsg rpcRsp = { int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
.handle = pMsg->handle, int32_t code = 0;
.ahandle = pMsg->ahandle, bool queryRsped = false;
.pCont = pRsp, bool needStop = false;
.contLen = sizeof(*pRsp), struct SSubplan *plan = NULL;
.code = code, int32_t rspCode = 0;
}; SQWPhaseInput input = {0};
SQWPhaseOutput output = {0};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) {
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS; QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output));
}
int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { needStop = output.needStop;
int32_t size = 0; code = output.rspCode;
if (sStatus) { if (needStop) {
size = sizeof(SSchedulerStatusRsp) + sizeof(sStatus->status[0]) * sStatus->num; QW_TASK_DLOG("task need stop", NULL);
} else { QW_ERR_JRET(code);
size = sizeof(SSchedulerStatusRsp);
} }
SSchedulerStatusRsp *pRsp = (SSchedulerStatusRsp *)rpcMallocCont(size); code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
if (sStatus) { QW_TASK_ELOG("task string to subplan failed, code:%x", code);
memcpy(pRsp, sStatus, size); QW_ERR_JRET(code);
} else {
pRsp->num = 0;
}
SRpcMsg rpcRsp = {
.msgType = pMsg->msgType + 1,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = size,
.code = 0,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwInitFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
*rsp = pRsp;
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
dataLength = 0;
}
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) {
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
int32_t numOfCols = 6;
int32_t msgSize = sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols;
SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(msgSize);
int32_t cols = 0;
SSchema *pSchema = pRsp->metaInfo.pSchema;
const SSchema *s = tGetTbnameColumnSchema();
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "name");
pSchema++;
int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "created");
pSchema++;
type = TSDB_DATA_TYPE_SMALLINT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "columns");
pSchema++;
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "stable");
pSchema++;
type = TSDB_DATA_TYPE_BIGINT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "uid");
pSchema++;
type = TSDB_DATA_TYPE_INT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "vgId");
assert(cols == numOfCols);
pRsp->metaInfo.numOfColumns = htonl(cols);
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = msgSize,
.code = code,
};
rpcSendResponse(&rpcMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
int32_t handle = htonl(pFetchReq->id);
pRsp->numOfRows = 0;
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = 0,
};
rpcSendResponse(&rpcMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock);
if (QW_READY_NOT_RECEIVED == task->ready) {
QW_SCH_TASK_DLOG("ready not received, ready:%d", task->ready);
goto _return;
} else if (QW_READY_RECEIVED == task->ready) {
task->ready = QW_READY_RESPONSED;
int32_t rspCode = task->code;
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(qwBuildAndSendReadyRsp(pMsg, rspCode));
QW_SCH_TASK_DLOG("ready response sent, ready:%d", task->ready);
return TSDB_CODE_SUCCESS;
} else if (QW_READY_RESPONSED == task->ready) {
QW_SCH_TASK_ELOG("ready response already send, ready:%d", task->ready);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} else {
assert(0);
} }
_return: qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo);
if (task) { if (code) {
QW_UNLOCK(QW_WRITE, &task->lock); QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
qwReleaseTask(QW_READ, sch); QW_ERR_JRET(code);
} }
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock);
int8_t status = task->status;
int32_t errCode = task->code;
if (QW_TASK_READY(status)) { QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
task->ready = QW_READY_RESPONSED;
QW_UNLOCK(QW_WRITE, &task->lock);
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, errCode));
QW_SCH_TASK_DLOG("task ready responsed, status:%d", status);
} else {
task->ready = QW_READY_RECEIVED;
QW_UNLOCK(QW_WRITE, &task->lock);
QW_SCH_TASK_DLOG("task ready NOT responsed, status:%d", status);
}
_return:
if (task) {
qwReleaseTask(QW_READ, sch);
}
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwCheckAndProcessTaskDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needStop) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
int8_t status = JOB_TASK_STATUS_CANCELLED;
*needStop = false;
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)) { queryRsped = true;
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
if ((!atomic_load_8(&task->cancel)) && (!atomic_load_8(&task->drop))) {
QW_TASK_ELOG("no cancel or drop but task exists, status:%d", atomic_load_8(&task->status));
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
*needStop = true; DataSinkHandle sinkHandle = NULL;
code = qExecTask(pTaskInfo, &sinkHandle);
if (atomic_load_8(&task->cancel)) { if (code) {
QW_LOCK(QW_WRITE, &task->lock); QW_TASK_ELOG("qExecTask failed, code:%x", code);
code = qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
QW_UNLOCK(QW_WRITE, &task->lock);
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
if (task->drop) { QW_ERR_JRET(qwUpdateTaskCtxHandles(QW_FPARAMS(), pTaskInfo, sinkHandle));
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(qwDropTask(mgmt, sId, qId, tId));
}
_return: _return:
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
int8_t newStatus = JOB_TASK_STATUS_CANCELLED;
code = qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch);
if (code) {
QW_TASK_ELOG("sId:%"PRIx64" not in cache", sId);
QW_ERR_RET(code);
}
code = qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task);
if (code) { if (code) {
QW_TASK_ELOG("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId); rspCode = code;
QW_ERR_RET(code);
}
QW_LOCK(QW_WRITE, &task->lock);
if (task->cancel) {
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS());
} }
if (task->drop) { if (!queryRsped) {
QW_UNLOCK(QW_WRITE, &task->lock); code = qwBuildAndSendQueryRsp(qwMsg->connection, rspCode);
if (TSDB_CODE_SUCCESS == rspCode && code) {
qwReleaseTask(QW_READ, sch); rspCode = code;
qwReleaseScheduler(QW_READ, mgmt); }
qwDropTask(mgmt, sId, qId, tId);
return TSDB_CODE_SUCCESS;
} }
if (!(task->cancel || task->drop)) { if (needStop) {
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); QW_RET(rspCode);
task->code = errCode;
} }
QW_UNLOCK(QW_WRITE, &task->lock); input.code = rspCode;
qwReleaseTask(QW_READ, sch); if (TSDB_CODE_SUCCESS != rspCode) {
qwReleaseScheduler(QW_READ, mgmt); input.status = JOB_TASK_STATUS_FAILED;
} else {
return TSDB_CODE_SUCCESS; input.status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
int32_t qwScheduleDataSink(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->sinkScheduled)) {
qDebug("data sink already scheduled");
return TSDB_CODE_SUCCESS;
} }
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq)); QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = queryId;
req->taskId = taskId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); if (queryRsped && output.needRsp) {
if (TSDB_CODE_SUCCESS != code) { qwBuildAndSendReadyRsp(qwMsg->connection, output.rspCode);
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->queryScheduled)) {
QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled);
return TSDB_CODE_SUCCESS;
}
QW_ERR_RET(qwUpdateTaskStatus(mgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING));
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
} }
handles->queryScheduled = true; QW_RET(rspCode);
QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId);
return TSDB_CODE_SUCCESS;
} }
int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) { int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
int32_t needRsp = true; int32_t needRsp = true;
void *data = NULL; void *data = NULL;
int32_t sinkStatus = 0; int32_t sinkStatus = 0;
int32_t dataLength = 0; int32_t dataLen = 0;
SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false; bool queryEnd = false;
SQWTaskCtx *handles = NULL; bool needStop = false;
bool locked = false;
SQWTaskCtx *ctx = NULL;
int8_t status = 0; int8_t status = 0;
void *rsp = NULL;
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, mgmt, qId, tId, &handles)); SQWPhaseInput input = {0};
QW_LOCK(QW_WRITE, &handles->lock); SQWPhaseOutput output = {0};
if (handles->needRsp) {
QW_UNLOCK(QW_WRITE, &handles->lock);
QW_SCH_TASK_ELOG("last fetch not responsed, needRsp:%d", handles->needRsp);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_UNLOCK(QW_WRITE, &handles->lock);
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, &output));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
needStop = output.needStop;
if (task->cancel || task->drop) { code = output.rspCode;
QW_SCH_TASK_ELOG("task is already cancelled or dropped, cancel:%d, drop:%d", task->cancel, task->drop);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); if (needStop) {
} QW_TASK_DLOG("task need stop", NULL);
QW_ERR_JRET(code);
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
QW_SCH_TASK_ELOG("invalid status %d for fetch", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
dsGetDataLength(handles->sinkHandle, &dataLength, &queryEnd); QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
if (dataLength > 0) { QW_LOCK(QW_WRITE, &ctx->lock);
SOutputData output = {0};
QW_SCH_TASK_DLOG("task got data in sink, dataLength:%d", dataLength); locked = true;
QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp));
output.pData = rsp->data;
code = dsGetDataBlock(handles->sinkHandle, &output);
if (code) {
qError("dsGetDataBlock failed, code:%x", code);
QW_ERR_JRET(code);
}
rsp->useconds = htobe64(output.useconds); QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp));
rsp->completed = 0;
rsp->precision = output.precision;
rsp->compressed = output.compressed;
rsp->compLen = htonl(dataLength);
rsp->numOfRows = htonl(output.numOfRows);
if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) {
rsp->completed = 1;
status = JOB_TASK_STATUS_SUCCEED;
QW_SCH_TASK_DLOG("task all fetched, status:%d", status);
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()));
}
// Note: schedule data sink firstly and will schedule query after it's done
if (output.needSchedule) {
QW_SCH_TASK_DLOG("sink need schedule, queryEnd:%d", output.queryEnd);
QW_ERR_JRET(qwScheduleDataSink(handles, mgmt, sId, qId, tId, pMsg));
} else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) {
QW_SCH_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus);
QW_ERR_JRET(qwScheduleQuery(handles, mgmt, sId, qId, tId, pMsg));
}
} else {
if (dataLength < 0) {
QW_SCH_TASK_ELOG("invalid length from dsGetDataLength, length:%d", dataLength);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (queryEnd) {
status = JOB_TASK_STATUS_SUCCEED;
QW_SCH_TASK_DLOG("no data in sink and query end, dataLength:%d", dataLength); _return:
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()));
} else {
assert(0 == handles->needRsp);
// MUST IN SCHEDULE OR IN SINK SCHEDULE if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
QW_SCH_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd); }
QW_LOCK(QW_WRITE, &handles->lock); if (ctx) {
handles->needRsp = true; qwReleaseTaskCtx(QW_READ, mgmt);
QW_UNLOCK(QW_WRITE, &handles->lock); }
needRsp = false; if (needRsp) {
} qwBuildAndSendFetchRsp(pMsg, rsp, dataLen, code);
} }
QW_RET(code);
}
_return:
if (task) { int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
qwReleaseTask(QW_READ, sch); int32_t code = 0;
} bool needRsp = false;
if (sch) { QW_ERR_JRET(qwDropTask(QW_FPARAMS(), &needRsp));
qwReleaseScheduler(QW_READ, mgmt);
}
if (needRsp) { _return:
qwBuildAndSendFetchRsp(pMsg, rsp, dataLength, code);
}
if (handles) { if (TSDB_CODE_SUCCESS != code || needRsp) {
qwReleaseTaskResCache(QW_READ, mgmt); QW_ERR_RET(qwBuildAndSendDropRsp(qwMsg->connection, code));
} }
QW_RET(code); return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) {
...@@ -1226,337 +879,261 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW ...@@ -1226,337 +879,261 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); return;
}
int32_t code = 0;
bool queryRsped = false;
bool needStop = false;
struct SSubplan *plan = NULL;
SSubQueryMsg *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
int32_t rspCode = 0;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = be64toh(msg->sId); SQWorkerMgmt *mgmt = *qWorkerMgmt;
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
uint64_t sId = msg->sId; //TODO STOP ALL QUERY
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop)); //TODO FREE ALL
if (needStop) {
QW_TASK_DLOG("task need stop, msgLen:%d", msg->contentLen);
qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
}
QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING)); tfree(*qWorkerMgmt);
}
code = qStringToSubplan(msg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("string to subplan failed, code:%d", code);
QW_ERR_JRET(code);
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
queryRsped = true; #if 0
#endif
DataSinkHandle sinkHandle = NULL;
code = qExecTask(pTaskInfo, &sinkHandle);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle));
_return:
if (code) {
rspCode = code;
}
if (!queryRsped) {
code = qwBuildAndSendQueryRsp(pMsg, rspCode);
if (TSDB_CODE_SUCCESS == rspCode && code) {
rspCode = code;
}
}
int8_t status = 0;
if (TSDB_CODE_SUCCESS != rspCode) {
status = JOB_TASK_STATUS_FAILED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, rspCode);
if (queryRsped) {
qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg);
}
QW_RET(rspCode);
}
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;
SQueryContinueReq *req = (SQueryContinueReq *)pMsg->pCont;
bool needStop = false;
SQWTaskCtx *handles = NULL;
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
qTaskInfo_t taskHandle = handles->taskHandle;
DataSinkHandle sinkHandle = handles->sinkHandle;
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop));
if (needStop) {
qWarn("task need stop");
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
handles->needRsp = false;
}
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
}
DataSinkHandle newHandle = NULL; int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, SRpcMsg *pMsg) {
code = qExecTask(taskHandle, &newHandle); if (atomic_load_8(&handles->sinkScheduled)) {
if (code) { qDebug("data sink already scheduled");
qError("qExecTask failed, code:%x", code); return TSDB_CODE_SUCCESS;
QW_ERR_JRET(code);
} }
if (sinkHandle != newHandle) { SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
qError("data sink mis-match"); if (NULL == req) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
_return:
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles)); req->header.vgId = mgmt->nodeId;
QW_LOCK(QW_WRITE, &handles->lock); req->sId = sId;
req->queryId = queryId;
if (handles->needRsp) { req->taskId = taskId;
code = qwBuildAndSendQueryRsp(pMsg, code);
handles->needRsp = false;
}
handles->queryScheduled = false;
QW_UNLOCK(QW_WRITE, &handles->lock); SRpcMsg pNewMsg = {
qwReleaseTaskResCache(QW_READ, qWorkerMgmt); .handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED; qError("put data sink schedule msg to queue failed, code:%x", code);
} else { rpcFreeCont(req);
status = JOB_TASK_STATUS_PARTIAL_SUCCEED; QW_ERR_RET(code);
} }
code = qwQueryPostProcess(qWorkerMgmt, req->sId, req->queryId, req->taskId, status, code); qDebug("put data sink schedule msg to query queue");
QW_RET(code); return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SSchedulerStatusRsp **rsp) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { SQWSchStatus *sch = NULL;
return TSDB_CODE_QRY_INVALID_INPUT; int32_t taskNum = 0;
}
SSinkDataReq *msg = pMsg->pCont; QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid sink data msg"); sch->lastAccessTs = taosGetTimestampSec();
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
//dsScheduleProcess(); QW_LOCK(QW_READ, &sch->tasksLock);
//TODO
taskNum = taosHashGetSize(sch->tasksHash);
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
*rsp = calloc(1, size);
if (NULL == *rsp) {
qError("calloc %d failed", size);
QW_UNLOCK(QW_READ, &sch->tasksLock);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS; void *key = NULL;
} size_t keyLen = 0;
int32_t i = 0;
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ void *pIter = taosHashIterate(sch->tasksHash, NULL);
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { while (pIter) {
return TSDB_CODE_QRY_INVALID_INPUT; SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
} taosHashGetKey(pIter, &key, &keyLen);
SResReadyReq *msg = pMsg->pCont; QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { (*rsp)->status[i].status = taskStatus->status;
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); pIter = taosHashIterate(sch->tasksHash, pIter);
} }
msg->sId = htobe64(msg->sId); QW_UNLOCK(QW_READ, &sch->tasksLock);
msg->queryId = htobe64(msg->queryId); qwReleaseScheduler(QW_READ, mgmt);
msg->taskId = htobe64(msg->taskId);
(*rsp)->num = taskNum;
QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SSchTasksStatusReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId); int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL;
SSchedulerStatusRsp *sStatus = NULL; QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return: sch->lastAccessTs = taosGetTimestampSec();
QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SResFetchReq *msg = pMsg->pCont; int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { SQWSchStatus *sch = NULL;
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SQWTaskStatus *task = NULL;
} int32_t code = 0;
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
*taskStatus = JOB_TASK_STATUS_NULL;
return TSDB_CODE_SUCCESS;
}
msg->sId = htobe64(msg->sId); if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
msg->queryId = htobe64(msg->queryId); qwReleaseScheduler(QW_READ, mgmt);
msg->taskId = htobe64(msg->taskId);
*taskStatus = JOB_TASK_STATUS_NULL;
return TSDB_CODE_SUCCESS;
}
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->sId)); *taskStatus = task->status;
void *data = NULL; qwReleaseTask(QW_READ, sch);
int32_t code = 0; qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(qwHandleFetch(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
QW_RET(code); QW_RET(code);
} }
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task cancel msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId); QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
_return:
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code)); QW_LOCK(QW_WRITE, &task->lock);
return TSDB_CODE_SUCCESS; task->cancel = true;
}
int8_t oriStatus = task->status;
int8_t newStatus = 0;
if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) {
QW_UNLOCK(QW_WRITE, &task->lock);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qwReleaseTask(QW_READ, sch);
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_QRY_INVALID_INPUT;
return TSDB_CODE_SUCCESS;
} else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
} else {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLING));
} }
int32_t code = 0; QW_UNLOCK(QW_WRITE, &task->lock);
STaskDropReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qwReleaseTask(QW_READ, sch);
qError("invalid task drop msg"); qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
//TODO call executer to cancel subquery async
}
return TSDB_CODE_SUCCESS;
_return: _return:
QW_ERR_RET(qwBuildAndSendDropRsp(pMsg, code)); if (task) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
}
if (sch) {
qwReleaseScheduler(QW_READ, mgmt);
}
return TSDB_CODE_SUCCESS; QW_RET(code);
} }
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; if (atomic_load_8(&handles->queryScheduled)) {
QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled);
return TSDB_CODE_SUCCESS;
} }
int32_t code = 0; QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
SVShowTablesReq *pReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code));
}
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == req) {
return TSDB_CODE_QRY_INVALID_INPUT; QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SVShowTablesFetchReq *pFetchReq = pMsg->pCont; req->header.vgId = mgmt->nodeId;
QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); req->sId = sId;
} req->queryId = qId;
req->taskId = tId;
void qWorkerDestroy(void **qWorkerMgmt) { SRpcMsg pNewMsg = {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { .handle = pMsg->handle,
return; .ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
} }
SQWorkerMgmt *mgmt = *qWorkerMgmt; handles->queryScheduled = true;
//TODO STOP ALL QUERY
//TODO FREE ALL QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId);
tfree(*qWorkerMgmt); return TSDB_CODE_SUCCESS;
} }
#include "qworker.h"
#include <common.h>
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qworkerInt.h"
#include "tmsg.h"
#include "tname.h"
#include "dataSinkMgt.h"
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize);
if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
*rsp = pRsp;
return TSDB_CODE_SUCCESS;
}
void qwBuildFetchRsp(SRetrieveTableRsp *rsp, SOutputData *input, int32_t len) {
rsp->useconds = htobe64(input->useconds);
rsp->completed = input->queryEnd;
rsp->precision = input->precision;
rsp->compressed = input->compressed;
rsp->compLen = htonl(len);
rsp->numOfRows = htonl(input->numOfRows);
}
void qwFreeFetchRsp(void *msg) {
rpcFreeCont(msg);
}
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
int32_t size = 0;
if (sStatus) {
size = sizeof(SSchedulerStatusRsp) + sizeof(sStatus->status[0]) * sStatus->num;
} else {
size = sizeof(SSchedulerStatusRsp);
}
SSchedulerStatusRsp *pRsp = (SSchedulerStatusRsp *)rpcMallocCont(size);
if (sStatus) {
memcpy(pRsp, sStatus, size);
} else {
pRsp->num = 0;
}
SRpcMsg rpcRsp = {
.msgType = pMsg->msgType + 1,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = size,
.code = 0,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
dataLength = 0;
}
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
int32_t numOfCols = 6;
int32_t msgSize = sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols;
SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(msgSize);
int32_t cols = 0;
SSchema *pSchema = pRsp->metaInfo.pSchema;
const SSchema *s = tGetTbnameColumnSchema();
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "name");
pSchema++;
int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "created");
pSchema++;
type = TSDB_DATA_TYPE_SMALLINT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "columns");
pSchema++;
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "stable");
pSchema++;
type = TSDB_DATA_TYPE_BIGINT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "uid");
pSchema++;
type = TSDB_DATA_TYPE_INT;
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "vgId");
assert(cols == numOfCols);
pRsp->metaInfo.numOfColumns = htonl(cols);
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = msgSize,
.code = code,
};
rpcSendResponse(&rpcMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
int32_t handle = htonl(pFetchReq->id);
pRsp->numOfRows = 0;
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = 0,
};
rpcSendResponse(&rpcMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock);
int8_t status = task->status;
int32_t errCode = task->code;
if (QW_TASK_READY(status)) {
task->ready = QW_READY_RESPONSED;
QW_UNLOCK(QW_WRITE, &task->lock);
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, errCode));
QW_SCH_TASK_DLOG("task ready responsed, status:%d", status);
} else {
task->ready = QW_READY_RECEIVED;
QW_UNLOCK(QW_WRITE, &task->lock);
QW_SCH_TASK_DLOG("task ready NOT responsed, status:%d", status);
}
_return:
if (task) {
qwReleaseTask(QW_READ, sch);
}
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->sinkScheduled)) {
qDebug("data sink already scheduled");
return TSDB_CODE_SUCCESS;
}
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = queryId;
req->taskId = taskId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
return TSDB_CODE_SUCCESS;
}
int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->queryScheduled)) {
QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled);
return TSDB_CODE_SUCCESS;
}
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
handles->queryScheduled = true;
QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
SSubQueryMsg *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg, .msgLen = msg->contentLen, .connection = pMsg};
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg));
}
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;
SQueryContinueReq *req = (SQueryContinueReq *)pMsg->pCont;
bool needStop = false;
SQWTaskCtx *handles = NULL;
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
qTaskInfo_t taskHandle = handles->taskHandle;
DataSinkHandle sinkHandle = handles->sinkHandle;
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop));
if (needStop) {
qWarn("task need stop");
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
handles->needRsp = false;
}
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
}
DataSinkHandle newHandle = NULL;
code = qExecTask(taskHandle, &newHandle);
if (code) {
qError("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
if (sinkHandle != newHandle) {
qError("data sink mis-match");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
_return:
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
code = qwBuildAndSendQueryRsp(pMsg, code);
handles->needRsp = false;
}
handles->queryScheduled = false;
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
code = qwQueryPostProcess(qWorkerMgmt, req->sId, req->queryId, req->taskId, status, code);
QW_RET(code);
}
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SSinkDataReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid sink data msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
//dsScheduleProcess();
//TODO
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SResReadyReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SSchTasksStatusReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
SSchedulerStatusRsp *sStatus = NULL;
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return:
QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SResFetchReq *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
}
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
STaskCancelReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task cancel msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return:
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
STaskDropReq *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid task drop msg", NULL);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
}
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SVShowTablesReq *pReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code));
}
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
}
...@@ -353,6 +353,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist") ...@@ -353,6 +353,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLED, "Task cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPED, "Task dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
// grant // grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册