提交 8e078d63 编写于 作者: D dapan1121

feature/qnode

上级 7a622fa6
......@@ -52,6 +52,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RES_READY, "res-ready" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TASKS_STATUS, "tasks-status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CANCEL_TASK, "cancel-task" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_TASK, "drop-task" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
......@@ -1093,29 +1097,29 @@ typedef struct {
/* data */
} SUpdateTagValRsp;
typedef struct SSchedulerQueryMsg {
typedef struct SSubQueryMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
uint32_t contentLen;
char msg[];
} SSchedulerQueryMsg;
} SSubQueryMsg;
typedef struct SSchedulerReadyMsg {
typedef struct SResReadyMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerReadyMsg;
} SResReadyMsg;
typedef struct SSchedulerFetchMsg {
typedef struct SResFetchMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerFetchMsg;
} SResFetchMsg;
typedef struct SSchedulerStatusMsg {
typedef struct SSchTasksStatusMsg {
uint64_t schedulerId;
} SSchedulerStatusMsg;
} SSchTasksStatusMsg;
typedef struct STaskStatus {
uint64_t queryId;
......@@ -1129,11 +1133,17 @@ typedef struct SSchedulerStatusRsp {
} SSchedulerStatusRsp;
typedef struct SSchedulerCancelMsg {
typedef struct STaskCancelMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} STaskCancelMsg;
typedef struct STaskDropMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerCancelMsg;
} STaskDropMsg;
#pragma pack(pop)
......
......@@ -25,12 +25,15 @@ extern "C" {
#include "tlog.h"
enum {
JOB_TASK_STATUS_NULL = 0,
JOB_TASK_STATUS_NOT_START = 1,
JOB_TASK_STATUS_EXECUTING,
JOB_TASK_STATUS_PARTIAL_SUCCEED,
JOB_TASK_STATUS_SUCCEED,
JOB_TASK_STATUS_FAILED,
JOB_TASK_STATUS_CANCELLING,
JOB_TASK_STATUS_CANCELLED
JOB_TASK_STATUS_CANCELLED,
JOB_TASK_STATUS_DROPPING,
};
typedef struct STableComInfo {
......
......@@ -42,15 +42,17 @@ typedef struct {
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp);
int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
void qWorkerDestroy(void **qWorkerMgmt);
......
......@@ -314,6 +314,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070D) //"invalid time condition")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070E) //"System error")
#define TSDB_CODE_QRY_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x070F) //"invalid input")
#define TSDB_CODE_QRY_SCH_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0710) //"Scheduler not exist")
#define TSDB_CODE_QRY_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0711) //"Task not exist")
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist")
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
// grant
......
......@@ -33,6 +33,8 @@ typedef void (*_hash_free_fn_t)(void *);
#define HASH_INDEX(v, c) ((v) & ((c)-1))
#define HASH_NODE_EXIST(code) (code == -2)
/**
* murmur hash algorithm
* @key usually string
......
......@@ -45,6 +45,10 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET_CUR] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_RES_READY] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TASKS_STATUS] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CANCEL_TASK] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TASK] = dndProcessVnodeFetchMsg;
// msg from client to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg;
......
......@@ -29,6 +29,8 @@
#include "catalog.h"
#include "tep.h"
#include "trpc.h"
#include "stub.h"
#include "addr_any.h"
typedef struct SAppInstInfo {
int64_t numOfConns;
......@@ -86,6 +88,27 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
ASSERT_EQ(rpcRsp.code, 0);
}
void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SUseDbRsp *rspMsg = NULL; //todo
return;
}
void initTestEnv() {
static Stub stub;
stub.set(rpcSendRecv, __rpcSendRecv);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
for (const auto& f : result) {
stub.set(f.second, __rpcSendRecv);
}
}
}
}
TEST(testCase, normalCase) {
......
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#include "tlockfree.h"
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
......@@ -30,36 +32,63 @@ enum {
QW_READY_RESPONSED,
};
typedef struct SQWorkerTaskStatus {
int8_t status;
int8_t ready;
enum {
QW_TASK_INFO_STATUS = 1,
QW_TASK_INFO_READY,
};
enum {
QW_READ = 1,
QW_WRITE,
};
typedef struct SQWorkerTaskStatus {
SRWLatch lock;
int32_t code;
int8_t status;
int8_t ready;
bool cancel;
bool drop;
} SQWorkerTaskStatus;
typedef struct SQWorkerResCache {
SRWLatch lock;
void *data;
} SQWorkerResCache;
typedef struct SQWorkerSchTaskStatus {
typedef struct SQWorkerSchStatus {
int32_t lastAccessTs; // timestamp in second
SHashObj *taskStatus; // key:queryId+taskId, value: SQWorkerTaskStatus
} SQWorkerSchTaskStatus;
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWorkerTaskStatus
} SQWorkerSchStatus;
// Qnode/Vnode level task management
typedef struct SQWorkerMgmt {
SQWorkerCfg cfg;
SHashObj *scheduleHash; //key: schedulerId, value: SQWorkerSchTaskStatus
SRWLatch schLock;
SRWLatch resLock;
SHashObj *schHash; //key: schedulerId, value: SQWorkerSchStatus
SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache
} SQWorkerMgmt;
#define QW_TASK_DONE(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == status == JOB_TASK_STATUS_CANCELLED)
#define QW_GOT_RES_DATA(data) (false)
#define QW_LOW_RES_DATA(data) (false)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY_RESP(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0)
#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0)
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
#ifdef __cplusplus
}
......
......@@ -4,69 +4,79 @@
#include "qworkerInt.h"
#include "planner.h"
int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) {
SQWorkerTaskStatus tStatus = {0};
tStatus.status = taskStatus;
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) {
int32_t code = 0;
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
SQWorkerSchTaskStatus newSchStatus = {0};
newSchStatus.taskStatus = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == newSchStatus.taskStatus) {
qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
if (oriStatus == newStatus) {
if (newStatus == JOB_TASK_STATUS_CANCELLING) {
return TSDB_CODE_SUCCESS;
}
if (0 != taosHashPut(newSchStatus.taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) {
qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId);
taosHashCleanup(newSchStatus.taskStatus);
return TSDB_CODE_QRY_APP_ERROR;
}
newSchStatus.lastAccessTs = taosGetTimestampSec();
if (0 != taosHashPut(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId), &newSchStatus, sizeof(newSchStatus))) {
qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId);
taosHashCleanup(newSchStatus.taskStatus);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
switch (oriStatus) {
case JOB_TASK_STATUS_NULL:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED ) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_CANCELLING) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_SUCCEED:
case JOB_TASK_STATUS_FAILED:
case JOB_TASK_STATUS_CANCELLING:
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_CANCELLED:
default:
qError("invalid task status:%d", oriStatus);
return TSDB_CODE_QRY_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
schStatus->lastAccessTs = taosGetTimestampSec();
return TSDB_CODE_SUCCESS;
if (0 != taosHashPut(schStatus->taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) {
qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
_return:
return TSDB_CODE_SUCCESS;
qError("invalid task status:%d", oriStatus);
QW_ERR_RET(code);
}
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **taskStatus) {
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
return TSDB_CODE_QRY_APP_ERROR;
}
schStatus->lastAccessTs = taosGetTimestampSec();
int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) {
int32_t code = 0;
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerTaskStatus *tStatus = taosHashGet(schStatus->taskStatus, id, sizeof(id));
if (NULL == tStatus) {
qError("no task status for schedulerId[%"PRIx64"] queryId[%"PRIx64"] taskId[%"PRIx64"]", schedulerId, queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
switch (type) {
case QW_TASK_INFO_STATUS: {
int8_t newStatus = *(int8_t *)data;
QW_ERR_RET(qwCheckStatusSwitch(task->status, newStatus));
task->status = newStatus;
break;
}
default:
qError("uknown task info type:%d", type);
return TSDB_CODE_QRY_APP_ERROR;
}
*taskStatus = tStatus;
return TSDB_CODE_SUCCESS;
}
......@@ -76,12 +86,16 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v
SQWorkerResCache resCache = {0};
resCache.data = data;
QW_LOCK(QW_WRITE, &mgmt->resLock);
if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) {
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
return TSDB_CODE_SUCCESS;
}
......@@ -101,62 +115,693 @@ int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v
return TSDB_CODE_SUCCESS;
}
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) {
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
return TSDB_CODE_QRY_APP_ERROR;
static FORCE_INLINE int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) {
QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &schedulerId, sizeof(schedulerId));
if (NULL == (*sch)) {
QW_LOCK(rwType, &mgmt->schLock);
return TSDB_CODE_QRY_SCH_NOT_EXIST;
}
schStatus->lastAccessTs = taosGetTimestampSec();
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) {
SQWorkerSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) {
qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
while (true) {
QW_LOCK(QW_WRITE, &mgmt->schLock);
int32_t code = taosHashPut(mgmt->schHash, &schedulerId, sizeof(schedulerId), &newSch, sizeof(newSch));
if (0 != code) {
if (!HASH_NODE_EXIST(code)) {
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId);
taosHashCleanup(newSch.tasksHash);
return TSDB_CODE_QRY_APP_ERROR;
}
}
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, schedulerId, sch)) {
taosHashCleanup(newSch.tasksHash);
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) {
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
return TSDB_CODE_QRY_APP_ERROR;
static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->schLock);
}
static FORCE_INLINE int32_t qwAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **task) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
QW_LOCK(rwType, &sch->tasksLock);
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock);
return TSDB_CODE_QRY_TASK_NOT_EXIST;
}
schStatus->lastAccessTs = taosGetTimestampSec();
return TSDB_CODE_SUCCESS;
}
int32_t i = 0;
int32_t taskNum = taosHashGetSize(schStatus->taskStatus);
static FORCE_INLINE int32_t qwInsertAndAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, int8_t status, bool *inserted, SQWorkerTaskStatus **task) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
while (true) {
*inserted = false;
QW_LOCK(QW_WRITE, &sch->tasksLock);
int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &status, sizeof(status));
if (0 != code) {
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (HASH_NODE_EXIST(code)) {
if (qwAcquireTask(rwType, sch, queryId, taskId, task)) {
continue;
}
break;
} else {
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
}
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
*inserted = true;
if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, queryId, taskId, task)) {
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWorkerSchStatus *sch) {
QW_UNLOCK(rwType, &sch->tasksLock);
}
static FORCE_INLINE int32_t qwAcquireTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerResCache **res) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
QW_LOCK(rwType, &mgmt->resLock);
*res = taosHashGet(mgmt->resHash, id, sizeof(id));
if (NULL == (*res)) {
QW_UNLOCK(rwType, &mgmt->resLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST;
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->resLock);
}
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) {
SQWorkerSchStatus *schStatus = NULL;
int32_t taskNum = 0;
if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)) {
qWarn("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
} else {
schStatus->lastAccessTs = taosGetTimestampSec();
QW_LOCK(QW_READ, &schStatus->tasksLock);
taskNum = taosHashGetSize(schStatus->tasksHash);
}
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
*rsp = calloc(1, size);
if (NULL == *rsp) {
qError("calloc %d failed", size);
if (schStatus) {
QW_UNLOCK(QW_READ, &schStatus->tasksLock);
qwReleaseScheduler(QW_READ, mgmt);
}
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
void *key = NULL;
size_t keyLen = 0;
void *pIter = taosHashIterate(schStatus->taskStatus, NULL);
while (pIter) {
SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter;
taosHashGetKey(pIter, &key, &keyLen);
int32_t i = 0;
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
(*rsp)->status[i].status = taskStatus->status;
pIter = taosHashIterate(schStatus->taskStatus, pIter);
}
if (schStatus) {
void *pIter = taosHashIterate(schStatus->tasksHash, NULL);
while (pIter) {
SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter;
taosHashGetKey(pIter, &key, &keyLen);
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
(*rsp)->status[i].status = taskStatus->status;
pIter = taosHashIterate(schStatus->tasksHash, pIter);
}
}
if (schStatus) {
QW_UNLOCK(QW_READ, &schStatus->tasksLock);
qwReleaseScheduler(QW_READ, mgmt);
}
(*rsp)->num = taskNum;
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildRspMsg(void *data, int32_t msgType);
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) {
SQWorkerSchStatus *schStatus = NULL;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus));
schStatus->lastAccessTs = taosGetTimestampSec();
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
*taskStatus = task->status;
_return:
if (task) {
qwReleaseTask(QW_READ, sch);
}
if (sch) {
qwReleaseScheduler(QW_READ, mgmt);
}
QW_RET(code);
}
int32_t qwSwitchTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
bool inserted = false;
if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) {
if (qwCheckStatusSwitch(JOB_TASK_STATUS_NULL, taskStatus)) {
qError("switch status error, not start to %d", taskStatus);
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_RET(qwInsertAndAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
}
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
if (qwCheckStatusSwitch(JOB_TASK_STATUS_NOT_START, taskStatus)) {
qwReleaseScheduler(QW_READ, mgmt);
qError("switch status error, not start to %d", taskStatus);
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwInsertAndAcquireTask(QW_READ, sch, queryId, taskId, taskStatus, &inserted, &task));
if (inserted) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
QW_LOCK(QW_WRITE, &task->lock);
code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
QW_LOCK(QW_WRITE, &task->lock);
code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
_return:
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) {
QW_ERR_RET(qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
}
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
code = qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
}
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
}
QW_LOCK(QW_WRITE, &task->lock);
task->cancel = true;
int8_t oriStatus = task->status;
int8_t newStatus = 0;
if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
} else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
newStatus = JOB_TASK_STATUS_CANCELLED;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus));
} else {
newStatus = JOB_TASK_STATUS_CANCELLING;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus));
}
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
//TODO call executer to cancel subquery async
}
return TSDB_CODE_SUCCESS;
_return:
if (task) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
}
if (sch) {
qwReleaseScheduler(QW_READ, mgmt);
}
QW_RET(code);
}
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
QW_LOCK(QW_WRITE, &mgmt->resLock);
if (mgmt->resHash) {
taosHashRemove(mgmt->resHash, id, sizeof(id));
}
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, schedulerId, &sch)) {
qWarn("scheduler %"PRIx64" doesn't exist", schedulerId);
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_WRITE, mgmt);
qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId);
return TSDB_CODE_SUCCESS;
}
taosHashRemove(sch->tasksHash, id, sizeof(id));
qwReleaseTask(QW_WRITE, sch);
qwReleaseScheduler(QW_WRITE, mgmt);
return TSDB_CODE_SUCCESS;
}
int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) {
qWarn("scheduler %"PRIx64" doesn't exist", schedulerId);
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_READ, mgmt);
qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId);
return TSDB_CODE_SUCCESS;
}
QW_LOCK(QW_WRITE, &task->lock);
task->drop = true;
int8_t oriStatus = task->status;
int8_t newStatus = 0;
if (task->status == JOB_TASK_STATUS_EXECUTING) {
newStatus = JOB_TASK_STATUS_CANCELLING;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus));
} else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
} else {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(qwDropTask(mgmt, schedulerId, queryId, taskId));
return TSDB_CODE_SUCCESS;
}
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
if (oriStatus == JOB_TASK_STATUS_EXECUTING) {
//TODO call executer to cancel subquery async
}
return TSDB_CODE_SUCCESS;
_return:
if (task) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
}
if (sch) {
qwReleaseScheduler(QW_READ, mgmt);
}
QW_RET(code);
}
int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) {
}
int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) {
}
int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
}
int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) {
}
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg) {
}
int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg) {
}
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_WRITE, &task->lock);
if (QW_READY_NOT_RECEIVED == task->ready) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
} else if (QW_READY_RECEIVED == task->ready) {
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, rspCode));
task->ready = QW_READY_RESPONSED;
} else if (QW_READY_RESPONSED == task->ready) {
qError("query response already send");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} else {
assert(0);
}
_return:
if (task) {
QW_UNLOCK(QW_WRITE, &task->lock);
}
if (sch) {
qwReleaseTask(QW_READ, sch);
}
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_WRITE, &task->lock);
if (QW_TASK_READY_RESP(task->status)) {
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, task->code));
task->ready = QW_READY_RESPONSED;
} else {
task->ready = QW_READY_RECEIVED;
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
_return:
if (task) {
QW_UNLOCK(QW_WRITE, &task->lock);
}
if (sch) {
qwReleaseTask(QW_READ, sch);
}
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, bool *needStop) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
int8_t status = JOB_TASK_STATUS_CANCELLED;
*needStop = false;
if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) {
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
QW_LOCK(QW_READ, &task->lock);
if ((!task->cancel) && (!task->drop)) {
QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
QW_UNLOCK(QW_READ, &task->lock);
*needStop = true;
if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
QW_UNLOCK(QW_WRITE, &task->lock);
} else if (task->drop) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
qwDropTask(mgmt, schedulerId, queryId, taskId);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock);
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED && task->status != JOB_TASK_STATUS_SUCCEED) {
qError("invalid status %d for fetch", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (QW_GOT_RES_DATA(res->data)) {
QW_ERR_JRET(qwBuildAndSendFetchRsp(pMsg, res->data));
if (QW_LOW_RES_DATA(res->data)) {
if (task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
//TODO add query back to queue
}
}
} else {
if (task->status != JOB_TASK_STATUS_EXECUTING) {
qError("invalid status %d for fetch without res", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
//TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY
}
_return:
if (task) {
QW_UNLOCK(QW_READ, &task->lock);
}
if (sch) {
qwReleaseTask(QW_READ, sch);
}
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t status, int32_t errCode) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0;
int8_t newStatus = JOB_TASK_STATUS_CANCELLED;
code = qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch);
if (code) {
qError("schedulerId:%"PRIx64" not in cache", schedulerId);
QW_ERR_RET(code);
}
code = qwAcquireTask(QW_READ, sch, queryId, taskId, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
qError("schedulerId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", schedulerId, queryId, taskId);
QW_ERR_RET(code);
}
if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
} else if (task->drop) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
qwDropTask(mgmt, schedulerId, queryId, taskId);
return TSDB_CODE_SUCCESS;
} else {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
task->code = errCode;
QW_UNLOCK(QW_WRITE, &task->lock);
}
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
if (NULL == mgmt) {
qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (cfg) {
......@@ -167,16 +812,16 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
}
mgmt->scheduleHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == mgmt->scheduleHash) {
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
if (NULL == mgmt->schHash) {
tfree(mgmt);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", mgmt->cfg.maxSchedulerNum);
}
mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == mgmt->resHash) {
taosHashCleanup(mgmt->scheduleHash);
mgmt->scheduleHash = NULL;
taosHashCleanup(mgmt->schHash);
mgmt->schHash = NULL;
tfree(mgmt);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", mgmt->cfg.maxResCacheNum);
......@@ -187,99 +832,179 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp) {
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSubQueryMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
bool queryDone = false;
bool queryRsp = false;
bool needStop = false;
SSubplan *plan = NULL;
SQWorkerTaskStatus *tStatus = NULL;
int32_t code = 0;
int32_t code = qStringToSubplan(msg->msg, &plan);
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop));
if (needStop) {
qWarn("task need stop");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
}
code = qStringToSubplan(msg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code);
return code;
QW_ERR_JRET(code);
}
//TODO call executer to init subquery
QW_ERR_JRET(qwAddTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING));
//TODO call executer to init subquery
code = 0; // return error directly
//TODO call executer to init subquery
if (code) {
QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING));
}
QW_ERR_JRET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_QUERY_RSP));
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
queryRsp = true;
//TODO call executer to execute subquery
code = 0;
code = 0;
void *data = NULL;
queryDone = false;
//TODO call executer to execute subquery
QW_ERR_JRET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus));
tStatus->status = (code) ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_SUCCEED;
if (code) {
QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data));
QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data));
QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED));
}
_return:
if (tStatus && QW_TASK_DONE(tStatus->status) && QW_READY_RECEIVED == tStatus->ready) {
QW_ERR_RET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_RES_READY_RSP));
if (queryRsp) {
code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg, code);
} else {
code = qwBuildAndSendQueryRsp(pMsg, code);
}
int8_t status = 0;
if (TSDB_CODE_SUCCESS != code || queryDone) {
if (code) {
status = JOB_TASK_STATUS_FAILED; //TODO set CANCELLED from code
} else {
status = JOB_TASK_STATUS_SUCCEED;
}
qDestroySubplan(plan);
qwQueryPostProcess(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, status, code);
}
return code;
QW_RET(code);
}
int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp){
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SQWorkerTaskStatus *tStatus = NULL;
QW_ERR_RET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus));
if (QW_TASK_DONE(tStatus->status)) {
QW_ERR_RET(qwBuildRspMsg(tStatus, TSDB_MSG_TYPE_RES_READY_RSP));
} else {
tStatus->ready = QW_READY_RECEIVED;
return TSDB_CODE_SUCCESS;
}
SResReadyMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
tStatus->ready = QW_READY_RESPONSED;
QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp) {
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SSchTasksStatusMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchedulerStatusRsp *sStatus = NULL;
QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus));
QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp) {
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SResFetchMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId));
void *data = NULL;
SQWorkerResCache *res = NULL;
int32_t code = 0;
QW_ERR_RET(qwGetTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, &data));
QW_ERR_RET(qwAcquireTaskResCache(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &res));
QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg));
_return:
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_RET(code);
}
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
STaskCancelMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
QW_ERR_RET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
STaskDropMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
QW_ERR_RET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
QW_ERR_RET(qwBuildRspMsg(data, TSDB_MSG_TYPE_FETCH_RSP));
QW_ERR_RET(qwBuildAndSendDropRsp(pMsg));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
......
......@@ -296,44 +296,45 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
}
int32_t len = strlen(task->msg);
msgSize = sizeof(SSchedulerQueryMsg) + len;
msgSize = sizeof(SSubQueryMsg) + len + 1;
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchedulerQueryMsg *pMsg = msg;
SSubQueryMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(len);
memcpy(pMsg->msg, task->msg, len);
pMsg->msg[len] = 0;
break;
}
case TSDB_MSG_TYPE_RES_READY: {
msgSize = sizeof(SSchedulerReadyMsg);
msgSize = sizeof(SResReadyMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchedulerReadyMsg *pMsg = msg;
SResReadyMsg *pMsg = msg;
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TSDB_MSG_TYPE_FETCH: {
msgSize = sizeof(SSchedulerFetchMsg);
msgSize = sizeof(SResFetchMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchedulerFetchMsg *pMsg = msg;
SResFetchMsg *pMsg = msg;
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
......
......@@ -323,7 +323,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
......
......@@ -291,7 +291,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type);
return pHashObj->enableUpdate ? 0 : -1;
return pHashObj->enableUpdate ? 0 : -2;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册