提交 ae5541f5 编写于 作者: D dapan1121

feature/qnode

上级 48cc2702
...@@ -51,7 +51,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) ...@@ -51,7 +51,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RES_READY, "res-ready" )
// message from client to mnode // message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
...@@ -1090,11 +1090,13 @@ typedef struct SSchedulerQueryMsg { ...@@ -1090,11 +1090,13 @@ typedef struct SSchedulerQueryMsg {
} SSchedulerQueryMsg; } SSchedulerQueryMsg;
typedef struct SSchedulerReadyMsg { typedef struct SSchedulerReadyMsg {
uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} SSchedulerReadyMsg; } SSchedulerReadyMsg;
typedef struct SSchedulerFetchMsg { typedef struct SSchedulerFetchMsg {
uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} SSchedulerFetchMsg; } SSchedulerFetchMsg;
...@@ -1103,6 +1105,24 @@ typedef struct SSchedulerStatusMsg { ...@@ -1103,6 +1105,24 @@ typedef struct SSchedulerStatusMsg {
uint64_t schedulerId; uint64_t schedulerId;
} SSchedulerStatusMsg; } SSchedulerStatusMsg;
typedef struct STaskStatus {
uint64_t queryId;
uint64_t taskId;
int8_t status;
} STaskStatus;
typedef struct SSchedulerStatusRsp {
uint32_t num;
STaskStatus status[];
} SSchedulerStatusRsp;
typedef struct SSchedulerCancelMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerCancelMsg;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -24,6 +24,15 @@ extern "C" { ...@@ -24,6 +24,15 @@ extern "C" {
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
enum {
JOB_TASK_STATUS_NOT_START = 1,
JOB_TASK_STATUS_EXECUTING,
JOB_TASK_STATUS_SUCCEED,
JOB_TASK_STATUS_FAILED,
JOB_TASK_STATUS_CANCELLING,
JOB_TASK_STATUS_CANCELLED
};
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision uint8_t precision; // the number of precision
......
...@@ -13,16 +13,19 @@ ...@@ -13,16 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_QUERY_WORKER_H_ #ifndef _TD_QWORKER_H_
#define _TD_QUERY_WORKER_H_ #define _TD_QWORKER_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "trpc.h"
typedef struct SQWorkerCfg { typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum; uint32_t maxSchedulerNum;
uint32_t maxResCacheNum; uint32_t maxResCacheNum;
uint32_t maxSchTaskNum;
} SQWorkerCfg; } SQWorkerCfg;
typedef struct { typedef struct {
...@@ -37,14 +40,23 @@ typedef struct { ...@@ -37,14 +40,23 @@ typedef struct {
} SQWorkerStat; } SQWorkerStat;
int32_t qWorkerInit(SQWorkerCfg *cfg); int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessQueryMsg(char *msg, int32_t msgLen, int32_t *code, char **rspMsg); int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
void qWorkerDestroy(void **qWorkerMgmt);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_QUERY_WORKER_H_*/ #endif /*_TD_QWORKER_H_*/
...@@ -193,11 +193,10 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); ...@@ -193,11 +193,10 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
/** /**
* Get the corresponding key information for a given data in hash table * Get the corresponding key information for a given data in hash table
* @param pHashObj
* @param data * @param data
* @return * @return
*/ */
int32_t taosHashGetKey(SHashObj *pHashObj, void *data, void** key, size_t* keyLen); int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -11,3 +11,4 @@ add_subdirectory(executor) ...@@ -11,3 +11,4 @@ add_subdirectory(executor)
add_subdirectory(planner) add_subdirectory(planner)
add_subdirectory(function) add_subdirectory(function)
add_subdirectory(qcom) add_subdirectory(qcom)
add_subdirectory(qworker)
...@@ -1175,7 +1175,7 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g ...@@ -1175,7 +1175,7 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g
void* key = NULL; void* key = NULL;
len = 0; len = 0;
taosHashGetKey((SHashObj *)data, p, &key, &len); taosHashGetKey(p, &key, &len);
void *fdata = NULL; void *fdata = NULL;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
......
#include "queryInt.h"
#include "query.h"
#include "taosmsg.h"
#include "queryWorker.h"
#include "qWorkerInt.h"
SQWorkerMgmt qWorkerMgmt = {0};
int32_t qWorkerInit(SQWorkerCfg *cfg) {
if (cfg) {
qWorkerMgmt.cfg = *cfg;
} else {
qWorkerMgmt.cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
qWorkerMgmt.cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER;
}
qWorkerMgmt.scheduleHash = taosHashInit(qWorkerMgmt.cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == qWorkerMgmt.scheduleHash) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", qWorkerMgmt.cfg.maxSchedulerNum);
}
qWorkerMgmt.resHash = taosHashInit(qWorkerMgmt.cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == qWorkerMgmt.resHash) {
taosHashCleanup(qWorkerMgmt.scheduleHash);
qWorkerMgmt.scheduleHash = NULL;
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", qWorkerMgmt.cfg.maxResCacheNum);
}
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(SSchedulerQueryMsg *msg, int32_t msgLen, int32_t *code, char **rspMsg) {
// TODO
return 0;
}
aux_source_directory(src QWORKER_SRC)
add_library(qworker ${QWORKER_SRC})
target_include_directories(
qworker
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
qworker
PRIVATE os util transport planner qcom
)
...@@ -22,13 +22,44 @@ extern "C" { ...@@ -22,13 +22,44 @@ extern "C" {
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000 #define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000 #define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
enum {
QW_READY_NOT_RECEIVED = 0,
QW_READY_RECEIVED,
QW_READY_RESPONSED,
};
typedef struct SQWorkerTaskStatus {
int8_t status;
int8_t ready;
} SQWorkerTaskStatus;
typedef struct SQWorkerResCache {
void *data;
} SQWorkerResCache;
typedef struct SQWorkerSchTaskStatus {
int32_t lastAccessTs; // timestamp in second
SHashObj *taskStatus; // key:queryId+taskId, value: SQWorkerTaskStatus
} SQWorkerSchTaskStatus;
// Qnode/Vnode level task management
typedef struct SQWorkerMgmt { typedef struct SQWorkerMgmt {
SQWorkerCfg cfg; SQWorkerCfg cfg;
SHashObj *scheduleHash; //key: schedulerId, value: SHashObj *scheduleHash; //key: schedulerId, value: SQWorkerSchTaskStatus
SHashObj *resHash; //key: queryId+taskId, value: SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_TASK_DONE(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == status == JOB_TASK_STATUS_CANCELLED)
#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0)
#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0)
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
#include "taosmsg.h"
#include "query.h"
#include "qworker.h"
#include "qworkerInt.h"
#include "planner.h"
int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) {
SQWorkerTaskStatus tStatus = {0};
tStatus.status = taskStatus;
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
SQWorkerSchTaskStatus newSchStatus = {0};
newSchStatus.taskStatus = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == newSchStatus.taskStatus) {
qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
if (0 != taosHashPut(newSchStatus.taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) {
qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId);
taosHashCleanup(newSchStatus.taskStatus);
return TSDB_CODE_QRY_APP_ERROR;
}
newSchStatus.lastAccessTs = taosGetTimestampSec();
if (0 != taosHashPut(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId), &newSchStatus, sizeof(newSchStatus))) {
qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId);
taosHashCleanup(newSchStatus.taskStatus);
return TSDB_CODE_QRY_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
schStatus->lastAccessTs = taosGetTimestampSec();
if (0 != taosHashPut(schStatus->taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) {
qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **taskStatus) {
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
return TSDB_CODE_QRY_APP_ERROR;
}
schStatus->lastAccessTs = taosGetTimestampSec();
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerTaskStatus *tStatus = taosHashGet(schStatus->taskStatus, id, sizeof(id));
if (NULL == tStatus) {
qError("no task status for schedulerId[%"PRIx64"] queryId[%"PRIx64"] taskId[%"PRIx64"]", schedulerId, queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
*taskStatus = tStatus;
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void *data) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerResCache resCache = {0};
resCache.data = data;
if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) {
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void **data) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerResCache *resCache = taosHashGet(mgmt->resHash, id, sizeof(id));
if (NULL == resCache) {
qError("no task res for queryId[%"PRIx64"] taskId[%"PRIx64"]", queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
*data = resCache->data;
return TSDB_CODE_SUCCESS;
}
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) {
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
return TSDB_CODE_QRY_APP_ERROR;
}
schStatus->lastAccessTs = taosGetTimestampSec();
return TSDB_CODE_SUCCESS;
}
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) {
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
return TSDB_CODE_QRY_APP_ERROR;
}
schStatus->lastAccessTs = taosGetTimestampSec();
int32_t i = 0;
int32_t taskNum = taosHashGetSize(schStatus->taskStatus);
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
*rsp = calloc(1, size);
if (NULL == *rsp) {
qError("calloc %d failed", size);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
void *key = NULL;
size_t keyLen = 0;
void *pIter = taosHashIterate(schStatus->taskStatus, NULL);
while (pIter) {
SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter;
taosHashGetKey(pIter, &key, &keyLen);
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
(*rsp)->status[i].status = taskStatus->status;
pIter = taosHashIterate(schStatus->taskStatus, pIter);
}
(*rsp)->num = taskNum;
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildRspMsg(void *data, int32_t msgType);
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
if (NULL == mgmt) {
qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
if (cfg) {
mgmt->cfg = *cfg;
} else {
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER;
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
}
mgmt->scheduleHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == mgmt->scheduleHash) {
tfree(mgmt);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", mgmt->cfg.maxSchedulerNum);
}
mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == mgmt->resHash) {
taosHashCleanup(mgmt->scheduleHash);
mgmt->scheduleHash = NULL;
tfree(mgmt);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", mgmt->cfg.maxResCacheNum);
}
*qWorkerMgmt = mgmt;
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp) {
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SSubplan *plan = NULL;
SQWorkerTaskStatus *tStatus = NULL;
int32_t code = qStringToSubplan(msg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code);
return code;
}
//TODO call executer to init subquery
QW_ERR_JRET(qwAddTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING));
QW_ERR_JRET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_QUERY_RSP));
//TODO call executer to execute subquery
code = 0;
void *data = NULL;
//TODO call executer to execute subquery
QW_ERR_JRET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus));
tStatus->status = (code) ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_SUCCEED;
QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data));
_return:
if (tStatus && QW_TASK_DONE(tStatus->status) && QW_READY_RECEIVED == tStatus->ready) {
QW_ERR_RET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_RES_READY_RSP));
}
qDestroySubplan(plan);
return code;
}
int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp){
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SQWorkerTaskStatus *tStatus = NULL;
QW_ERR_RET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus));
if (QW_TASK_DONE(tStatus->status)) {
QW_ERR_RET(qwBuildRspMsg(tStatus, TSDB_MSG_TYPE_RES_READY_RSP));
} else {
tStatus->ready = QW_READY_RECEIVED;
return TSDB_CODE_SUCCESS;
}
tStatus->ready = QW_READY_RESPONSED;
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp) {
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SSchedulerStatusRsp *sStatus = NULL;
QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp) {
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId));
void *data = NULL;
QW_ERR_RET(qwGetTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, &data));
QW_ERR_RET(qwBuildRspMsg(data, TSDB_MSG_TYPE_FETCH_RSP));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
return;
}
SQWorkerMgmt *mgmt = *qWorkerMgmt;
//TODO STOP ALL QUERY
//TODO FREE ALL
tfree(*qWorkerMgmt);
}
...@@ -31,15 +31,6 @@ extern "C" { ...@@ -31,15 +31,6 @@ extern "C" {
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA #define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
enum {
SCH_STATUS_NOT_START = 1,
SCH_STATUS_EXECUTING,
SCH_STATUS_SUCCEED,
SCH_STATUS_FAILED,
SCH_STATUS_CANCELLING,
SCH_STATUS_CANCELLED
};
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; uint64_t taskId;
uint64_t schedulerId; uint64_t schedulerId;
......
...@@ -161,7 +161,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -161,7 +161,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SArray *levelPlans = NULL; SArray *levelPlans = NULL;
int32_t levelPlanNum = 0; int32_t levelPlanNum = 0;
level.status = SCH_STATUS_NOT_START; level.status = JOB_TASK_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) { for (int32_t i = 0; i < levelNum; ++i) {
level.level = i; level.level = i;
...@@ -191,7 +191,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -191,7 +191,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task->plan = plan; task->plan = plan;
task->status = SCH_STATUS_NOT_START; task->status = JOB_TASK_STATUS_NOT_START;
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) { if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) {
qError("taosHashPut failed"); qError("taosHashPut failed");
...@@ -292,14 +292,14 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -292,14 +292,14 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
SSchedulerQueryMsg *pMsg = msg; SSchedulerQueryMsg *pMsg = msg;
pMsg->schedulerId = schMgmt.schedulerId; pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = job->queryId; pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = task->taskId; pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = len; pMsg->contentLen = htonl(len);
memcpy(pMsg->msg, task->msg, len); memcpy(pMsg->msg, task->msg, len);
break; break;
} }
case TSDB_MSG_TYPE_RSP_READY: { case TSDB_MSG_TYPE_RES_READY: {
msgSize = sizeof(SSchedulerReadyMsg); msgSize = sizeof(SSchedulerReadyMsg);
msg = calloc(1, msgSize); msg = calloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
...@@ -308,8 +308,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -308,8 +308,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
} }
SSchedulerReadyMsg *pMsg = msg; SSchedulerReadyMsg *pMsg = msg;
pMsg->queryId = job->queryId; pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = task->taskId; pMsg->taskId = htobe64(task->taskId);
break; break;
} }
case TSDB_MSG_TYPE_FETCH: { case TSDB_MSG_TYPE_FETCH: {
...@@ -321,8 +321,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -321,8 +321,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
} }
SSchedulerFetchMsg *pMsg = msg; SSchedulerFetchMsg *pMsg = msg;
pMsg->queryId = job->queryId; pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = task->taskId; pMsg->taskId = htobe64(task->taskId);
break; break;
} }
default: default:
...@@ -365,7 +365,7 @@ _return: ...@@ -365,7 +365,7 @@ _return:
int32_t schProcessOnJobSuccess(SQueryJob *job) { int32_t schProcessOnJobSuccess(SQueryJob *job) {
job->status = SCH_STATUS_SUCCEED; job->status = JOB_TASK_STATUS_SUCCEED;
if (job->userFetch) { if (job->userFetch) {
SCH_ERR_RET(schFetchFromRemote(job)); SCH_ERR_RET(schFetchFromRemote(job));
...@@ -375,7 +375,7 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) { ...@@ -375,7 +375,7 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) {
} }
int32_t schProcessOnJobFailure(SQueryJob *job) { int32_t schProcessOnJobFailure(SQueryJob *job) {
job->status = SCH_STATUS_FAILED; job->status = JOB_TASK_STATUS_FAILED;
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
...@@ -402,7 +402,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { ...@@ -402,7 +402,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
task->status = SCH_STATUS_SUCCEED; task->status = JOB_TASK_STATUS_SUCCEED;
int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); int32_t parentNum = (int32_t)taosArrayGetSize(task->parents);
if (parentNum == 0) { if (parentNum == 0) {
...@@ -448,7 +448,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod ...@@ -448,7 +448,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
if (!needRetry) { if (!needRetry) {
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
job->status = SCH_STATUS_FAILED; job->status = JOB_TASK_STATUS_FAILED;
SCH_ERR_RET(schProcessOnJobFailure(job)); SCH_ERR_RET(schProcessOnJobFailure(job));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -467,13 +467,13 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32 ...@@ -467,13 +467,13 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else { } else {
code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY); code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY);
if (code) { if (code) {
goto _task_error; goto _task_error;
} }
} }
break; break;
case TSDB_MSG_TYPE_RSP_READY: case TSDB_MSG_TYPE_RES_READY:
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else { } else {
...@@ -518,7 +518,7 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { ...@@ -518,7 +518,7 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(schPushTaskToExecList(job, task)); SCH_ERR_RET(schPushTaskToExecList(job, task));
task->status = SCH_STATUS_EXECUTING; task->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -530,7 +530,7 @@ int32_t schLaunchJob(SQueryJob *job) { ...@@ -530,7 +530,7 @@ int32_t schLaunchJob(SQueryJob *job) {
SCH_ERR_RET(schLaunchTask(job, task)); SCH_ERR_RET(schLaunchTask(job, task));
} }
job->status = SCH_STATUS_EXECUTING; job->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -590,7 +590,7 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM ...@@ -590,7 +590,7 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
job->status = SCH_STATUS_NOT_START; job->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(job)); SCH_ERR_JRET(schLaunchJob(job));
...@@ -619,7 +619,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { ...@@ -619,7 +619,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if (job->status == SCH_STATUS_SUCCEED) { if (job->status == JOB_TASK_STATUS_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(job)); SCH_ERR_JRET(schFetchFromRemote(job));
} }
...@@ -653,7 +653,7 @@ void scheduleFreeJob(void *pJob) { ...@@ -653,7 +653,7 @@ void scheduleFreeJob(void *pJob) {
return; return;
} }
if (job->status == SCH_STATUS_EXECUTING) { if (job->status == JOB_TASK_STATUS_EXECUTING) {
scheduleCancelJob(pJob); scheduleCancelJob(pJob);
} }
} }
......
...@@ -776,9 +776,16 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) { ...@@ -776,9 +776,16 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) {
return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj); return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj);
} }
FORCE_INLINE void *taosHashGetDataKey(SHashObj *pHashObj, void *data) { FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) {
if (NULL == data || NULL == key) {
return -1;
}
SHashNode * node = GET_HASH_PNODE(data); SHashNode * node = GET_HASH_PNODE(data);
return GET_HASH_NODE_KEY(node); *key = GET_HASH_NODE_KEY(node);
*keyLen = node->keyLen;
return 0;
} }
FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) { FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册