未验证 提交 a3e8d256 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #10522 from taosdata/feature/qnode

Feature/qnode
......@@ -23,8 +23,6 @@ extern "C" {
#include "catalog.h"
#include "planner.h"
struct SSchJob;
typedef struct SSchedulerCfg {
uint32_t maxJobNum;
} SSchedulerCfg;
......@@ -72,7 +70,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes);
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, int64_t *pJob, const char* sql, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
......@@ -80,7 +78,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob);
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, int64_t *pJob);
/**
* Fetch query result from the remote query executor
......@@ -88,7 +86,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDa
* @param data
* @return
*/
int32_t schedulerFetchRows(struct SSchJob *pJob, void **data);
int32_t schedulerFetchRows(int64_t job, void **data);
/**
......@@ -102,7 +100,7 @@ int32_t schedulerFetchRows(struct SSchJob *pJob, void **data);
* Free the query job
* @param pJob
*/
void schedulerFreeJob(void *pJob);
void schedulerFreeJob(int64_t job);
void schedulerDestroy(void);
......
......@@ -171,7 +171,7 @@ typedef struct SRequestSendRecvBody {
void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SDataBuf requestMsg;
struct SSchJob* pQueryJob; // query job, created according to sql query DAG.
int64_t queryJob; // query job, created according to sql query DAG.
struct SQueryDag* pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo;
} SRequestSendRecvBody;
......
......@@ -227,10 +227,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.pQueryJob != NULL) {
schedulerFreeJob(pRequest->body.pQueryJob);
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
pRequest->code = code;
......@@ -240,8 +240,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.pQueryJob != NULL) {
schedulerFreeJob(pRequest->body.pQueryJob);
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
}
......@@ -494,7 +494,7 @@ void* doFetchRow(SRequestObj* pRequest) {
}
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void**)&pResInfo->pData);
int32_t code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
return NULL;
......
......@@ -36,6 +36,11 @@ enum {
SCH_WRITE,
};
typedef struct SSchTrans {
void *transInst;
void *transHandle;
} SSchTrans;
typedef struct SSchApiStat {
} SSchApiStat;
......@@ -59,12 +64,13 @@ typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob*
int32_t jobRef;
SSchedulerStat stat;
} SSchedulerMgmt;
typedef struct SSchCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
} SSchCallbackParam;
......@@ -75,7 +81,8 @@ typedef struct SSchLevel {
int32_t taskFailed;
int32_t taskSucceed;
int32_t taskNum;
SArray *subTasks; // Element is SQueryTask
int32_t taskLaunchIdx; // launch startup index
SArray *subTasks; // Element is SQueryTask
} SSchLevel;
typedef struct SSchTask {
......@@ -105,6 +112,7 @@ typedef struct SSchJobAttr {
} SSchJobAttr;
typedef struct SSchJob {
int64_t refId;
uint64_t queryId;
SSchJobAttr attr;
int32_t levelNum;
......@@ -119,7 +127,6 @@ typedef struct SSchJob {
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
int32_t ref;
int8_t status;
SQueryNodeAddr resNode;
tsem_t rspSem;
......@@ -168,6 +175,8 @@ typedef struct SSchJob {
static int32_t schLaunchTask(SSchJob *job, SSchTask *task);
static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
#ifdef __cplusplus
}
......
......@@ -17,12 +17,17 @@
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h"
typedef struct SSchTrans {
void *transInst;
void *transHandle;
}SSchTrans;
static SSchedulerMgmt schMgmt = {0};
SSchedulerMgmt schMgmt = {0};
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
}
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
return taosReleaseRef(schMgmt.jobRef, refId);
}
uint64_t schGenTaskId(void) {
return atomic_add_fetch_64(&schMgmt.taskId, 1);
......@@ -886,7 +891,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
case TDMT_VND_DROP_TASK_RSP: {
// SHOULD NEVER REACH HERE
SCH_TASK_ELOG("invalid status to handle drop task rsp, ref:%d", atomic_load_32(&pJob->ref));
SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
}
......@@ -908,28 +913,23 @@ _return:
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
SSchJob *pJob = NULL;
SSchTask *pTask = NULL;
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) {
qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId);
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, pParam->refId);
if (NULL == pJob) {
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64, pParam->queryId, pParam->taskId, pParam->refId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
}
pJob = *job;
atomic_add_fetch_32(&pJob->ref, 1);
int32_t s = taosHashGetSize(pJob->execTasks);
if (s <= 0) {
qError("QID:%"PRIx64",TID:%"PRId64" no task in execTask list", pParam->queryId, pParam->taskId);
SCH_JOB_ELOG("empty execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
qError("QID:%"PRIx64",TID:%"PRId64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId);
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
......@@ -942,7 +942,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
_return:
if (pJob) {
atomic_sub_fetch_32(&pJob->ref, 1);
taosReleaseRef(schMgmt.jobRef, pParam->refId);
}
tfree(param);
......@@ -1003,28 +1003,29 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
}
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* epSet, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0;
SSchTrans *trans = (SSchTrans *)transport;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
if (NULL == param) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam));
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
param->queryId = qId;
param->taskId = tId;
param->queryId = pJob->queryId;
param->refId = pJob->refId;
param->taskId = pTask->taskId;
pMsgSendInfo->param = param;
......@@ -1040,7 +1041,7 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
SCH_ERR_JRET(code);
}
qDebug("QID:0x%"PRIx64 ",TID:0x%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
SCH_TASK_DLOG("req msg sent, refId:%" PRIx64 ", type:%d, %s", pJob->refId, msgType, TMSG_INFO(msgType));
return TSDB_CODE_SUCCESS;
_return:
......@@ -1160,7 +1161,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
atomic_store_32(&pTask->lastMsgType, msgType);
SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle};
SCH_ERR_JRET(schAsyncSendMsg(&trans, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize));
if (isCandidateAddr) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr));
......@@ -1283,7 +1284,60 @@ void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->failTasks);
}
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, const char* sql, bool syncSchedule) {
int32_t schCancelJob(SSchJob *pJob) {
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
void schFreeJobImpl(void *job) {
if (NULL == job) {
return;
}
SSchJob *pJob = job;
uint64_t queryId = pJob->queryId;
int64_t refId = pJob->refId;
if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob);
}
schDropJobAllTasks(pJob);
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for(int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
schFreeTask(pTask);
}
taosArrayDestroy(pLevel->subTasks);
}
taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks);
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);
tfree(pJob->res);
tfree(pJob);
qDebug("QID:0x%"PRIx64" job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
}
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, int64_t *job, const char* sql, bool syncSchedule) {
qDebug("QID:0x%"PRIx64" job started", pDag->queryId);
if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) {
......@@ -1327,21 +1381,20 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDa
tsem_init(&pJob->rspSem, 0, 0);
code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
SCH_JOB_ELOG("job already exist, isQueryJob:%d", pJob->attr.queryJob);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} else {
SCH_JOB_ELOG("taosHashPut job failed, errno:%d", errno);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->refId = taosAddRef(schMgmt.jobRef, pJob);
if (pJob->refId < 0) {
SCH_JOB_ELOG("taosHashPut job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
pJob->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(pJob));
*(SSchJob **)job = pJob;
taosAcquireRef(schMgmt.jobRef, pJob->refId);
*job = pJob->refId;
if (syncSchedule) {
SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob));
......@@ -1349,25 +1402,20 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDa
}
SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob));
taosReleaseRef(schMgmt.jobRef, pJob->refId);
return TSDB_CODE_SUCCESS;
_return:
*(SSchJob **)job = NULL;
schedulerFreeJob(pJob);
schFreeJobImpl(pJob);
SCH_RET(code);
}
int32_t schCancelJob(SSchJob *pJob) {
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobs) {
if (schMgmt.jobRef) {
qError("scheduler already initialized");
return TSDB_CODE_QRY_INVALID_INPUT;
}
......@@ -1381,9 +1429,9 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.jobs) {
schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl);
if (schMgmt.jobRef < 0) {
qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1398,24 +1446,28 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes) {
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, int64_t *pJob, const char* sql, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
pRes->code = atomic_load_32(&(*pJob)->errCode);
pRes->numOfRows = (*pJob)->resNumOfRows;
SSchJob *job = taosAcquireRef(schMgmt.jobRef, *pJob);
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
taosReleaseRef(schMgmt.jobRef, *pJob);
return TSDB_CODE_SUCCESS;
}
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob) {
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, int64_t *pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false));
return TSDB_CODE_SUCCESS;
}
......@@ -1541,28 +1593,35 @@ _return:
}
int32_t schedulerFetchRows(SSchJob *pJob, void** pData) {
if (NULL == pJob || NULL == pData) {
int32_t schedulerFetchRows(int64_t job, void** pData) {
if (NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
atomic_add_fetch_32(&pJob->ref, 1);
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%d", status);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
taosReleaseRef(schMgmt.jobRef, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!SCH_JOB_NEED_FETCH(&pJob->attr)) {
SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob));
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
taosReleaseRef(schMgmt.jobRef, job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
taosReleaseRef(schMgmt.jobRef, job);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
......@@ -1588,7 +1647,6 @@ int32_t schedulerFetchRows(SSchJob *pJob, void** pData) {
SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
}
_return:
while (true) {
*pData = atomic_load_ptr(&pJob->res);
......@@ -1609,96 +1667,47 @@ _return:
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
}
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code));
atomic_sub_fetch_32(&pJob->ref, 1);
_return:
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
taosReleaseRef(schMgmt.jobRef, job);
SCH_RET(code);
}
int32_t scheduleCancelJob(void *job) {
SSchJob *pJob = (SSchJob *)job;
atomic_add_fetch_32(&pJob->ref, 1);
int32_t scheduleCancelJob(int64_t job) {
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
int32_t code = schCancelJob(pJob);
atomic_sub_fetch_32(&pJob->ref, 1);
taosReleaseRef(schMgmt.jobRef, job);
SCH_RET(code);
}
void schedulerFreeJob(void *job) {
if (NULL == job) {
void schedulerFreeJob(int64_t job) {
SSchJob *pJob = taosAcquireRef(schMgmt.jobRef, job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
return;
}
SSchJob *pJob = job;
uint64_t queryId = pJob->queryId;
bool setJobFree = false;
if (SCH_GET_JOB_STATUS(pJob) > 0) {
if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) {
SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob);
return;
}
SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref));
while (true) {
int32_t ref = atomic_load_32(&pJob->ref);
if (0 == ref) {
break;
} else if (ref > 0) {
if (1 == ref && atomic_load_8(&pJob->userFetch) > 0 && !setJobFree) {
schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
setJobFree = true;
}
usleep(1);
} else {
SCH_JOB_ELOG("invalid job ref number, ref:%d", ref);
break;
}
}
SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob));
if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob);
}
schDropJobAllTasks(pJob);
if (atomic_load_8(&pJob->userFetch) > 0) {
schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
}
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for(int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
schFreeTask(pTask);
}
SCH_JOB_DLOG("start to remove job from jobRef list, refId:%" PRIx64, job);
taosArrayDestroy(pLevel->subTasks);
if (taosRemoveRef(schMgmt.jobRef, job)) {
SCH_JOB_ELOG("remove job from job list failed, refId:%" PRIx64, job);
}
taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks);
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);
tfree(pJob->res);
tfree(pJob);
qDebug("QID:0x%"PRIx64" job freed", queryId);
}
void schedulerFreeTaskList(SArray *taskList) {
......@@ -1716,9 +1725,17 @@ void schedulerFreeTaskList(SArray *taskList) {
}
void schedulerDestroy(void) {
if (schMgmt.jobs) {
taosHashCleanup(schMgmt.jobs); //TODO
schMgmt.jobs = NULL;
if (schMgmt.jobRef) {
SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
while (pJob) {
taosRemoveRef(schMgmt.jobRef, pJob->refId);
pJob = taosIterateRef(schMgmt.jobRef, pJob->refId);
}
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = 0;
}
}
......@@ -38,15 +38,15 @@
#include "schedulerInt.h"
#include "stub.h"
#include "addr_any.h"
#include "tref.h"
namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode);
struct SSchJob *pInsertJob = NULL;
struct SSchJob *pQueryJob = NULL;
int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0;
uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0;
......@@ -65,6 +65,7 @@ void schtInitLogFile() {
tsAsyncLog = 0;
qDebugFlag = 159;
strcpy(tsLogDir, "/var/log/taos");
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
......@@ -255,34 +256,40 @@ void schtSetAsyncSendMsgToServer() {
void *schtSendRsp(void *param) {
SSchJob *job = NULL;
SSchJob *pJob = NULL;
int64_t job = 0;
int32_t code = 0;
while (true) {
job = *(SSchJob **)param;
job = *(int64_t *)param;
if (job) {
break;
}
usleep(1000);
}
pJob = schAcquireJob(job);
void *pIter = taosHashIterate(job->execTasks, NULL);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SSubmitRsp rsp = {0};
rsp.affectedRows = 10;
schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
schReleaseJob(job);
return NULL;
}
void *schtCreateFetchRspThread(void *param) {
struct SSchJob* job = (struct SSchJob*)param;
int64_t job = *(int64_t *)param;
SSchJob* pJob = schAcquireJob(job);
sleep(1);
......@@ -291,8 +298,10 @@ void *schtCreateFetchRspThread(void *param) {
rsp->completed = 1;
rsp->numOfRows = 10;
code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
schReleaseJob(job);
assert(code == 0);
}
......@@ -329,9 +338,9 @@ void *schtFetchRspThread(void *aa) {
void schtFreeQueryJob(int32_t freeThread) {
static uint32_t freeNum = 0;
SSchJob *job = atomic_load_ptr(&pQueryJob);
int64_t job = queryJobRefId;
if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) {
if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
schedulerFreeJob(job);
if (freeThread) {
if (++freeNum % schtTestPrintNum == 0) {
......@@ -360,7 +369,7 @@ void* schtRunJobThread(void *aa) {
schtSetExecNode();
schtSetAsyncSendMsgToServer();
SSchJob *job = NULL;
SSchJob *pJob = NULL;
SSchCallbackParam *param = NULL;
SHashObj *execTasks = NULL;
SDataBuf dataBuf = {0};
......@@ -376,24 +385,29 @@ void* schtRunJobThread(void *aa) {
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId);
assert(code == 0);
pJob = schAcquireJob(queryJobRefId);
if (NULL == pJob) {
taosArrayDestroy(qnodeList);
schtFreeQueryDag(&dag);
continue;
}
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
void *pIter = taosHashIterate(job->execTasks, NULL);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
schtFetchTaskId = task->taskId - 1;
taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
param = (SSchCallbackParam *)calloc(1, sizeof(*param));
param->queryId = schtQueryId;
pQueryJob = job;
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
pIter = taosHashIterate(execTasks, NULL);
while (pIter) {
......@@ -412,8 +426,9 @@ void* schtRunJobThread(void *aa) {
param = (SSchCallbackParam *)calloc(1, sizeof(*param));
param->queryId = schtQueryId;
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
pIter = taosHashIterate(execTasks, NULL);
while (pIter) {
SSchTask *task = (SSchTask *)pIter;
......@@ -431,7 +446,8 @@ void* schtRunJobThread(void *aa) {
param = (SSchCallbackParam *)calloc(1, sizeof(*param));
param->queryId = schtQueryId;
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
pIter = taosHashIterate(execTasks, NULL);
while (pIter) {
......@@ -450,7 +466,8 @@ void* schtRunJobThread(void *aa) {
param = (SSchCallbackParam *)calloc(1, sizeof(*param));
param->queryId = schtQueryId;
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
pIter = taosHashIterate(execTasks, NULL);
while (pIter) {
......@@ -470,7 +487,7 @@ void* schtRunJobThread(void *aa) {
atomic_store_32(&schtStartFetch, 1);
void *data = NULL;
code = schedulerFetchRows(pQueryJob, &data);
code = schedulerFetchRows(queryJobRefId, &data);
assert(code == 0 || code);
if (0 == code) {
......@@ -480,12 +497,13 @@ void* schtRunJobThread(void *aa) {
}
data = NULL;
code = schedulerFetchRows(pQueryJob, &data);
code = schedulerFetchRows(queryJobRefId, &data);
assert(code == 0 || code);
schtFreeQueryJob(0);
taosHashCleanup(execTasks);
taosArrayDestroy(qnodeList);
schtFreeQueryDag(&dag);
......@@ -516,7 +534,7 @@ TEST(queryTest, normalCase) {
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
SSchJob *pJob = NULL;
int64_t job = 0;
SQueryDag dag = {0};
schtInitLogFile();
......@@ -537,59 +555,61 @@ TEST(queryTest, normalCase) {
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &pJob);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
ASSERT_EQ(code, 0);
SSchJob *job = (SSchJob *)pJob;
void *pIter = taosHashIterate(job->execTasks, NULL);
SSchJob *pJob = schAcquireJob(job);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(job->execTasks, NULL);
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
printf("code:%d", code);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(job->execTasks, NULL);
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(job->execTasks, NULL);
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t thread1;
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL;
code = schedulerFetchRows(job, &data);
......@@ -603,9 +623,11 @@ TEST(queryTest, normalCase) {
data = NULL;
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data);
ASSERT_TRUE(data == NULL);
schReleaseJob(job);
schedulerFreeJob(pJob);
schedulerFreeJob(job);
schtFreeQueryDag(&dag);
......@@ -644,14 +666,14 @@ TEST(insertTest, normalCase) {
pthread_attr_init(&thattr);
pthread_t thread1;
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
pthread_create(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
SQueryResult res = {0};
code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, "insert into tb values(now,1)", &res);
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", &res);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);
schedulerFreeJob(pInsertJob);
schedulerFreeJob(insertJobRefId);
schedulerDestroy();
}
......@@ -684,4 +706,4 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册