/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "schedulerInt.h" #include "taosmsg.h" #include "query.h" #include "catalog.h" SSchedulerMgmt schMgmt = {0}; int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) { /* SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT); if (pRequest == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } SRequestMsgBody body = {0}; buildConnectMsg(pRequest, &body); int64_t transporterId = 0; sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); tsem_wait(&pRequest->body.rspSem); destroyConnectMsg(&body); if (pRequest->code != TSDB_CODE_SUCCESS) { const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); printf("failed to connect to server, reason: %s\n\n", errorMsg); destroyRequest(pRequest); taos_close(pTscObj); pTscObj = NULL; } else { tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter); destroyRequest(pRequest); } */ } int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { for (int32_t i = 0; i < job->levelNum; ++i) { SQueryLevel *level = taosArrayGet(job->levels, i); for (int32_t m = 0; m < level->taskNum; ++m) { SQueryTask *task = taosArrayGet(level->subTasks, m); SSubplan *plan = task->plan; int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0; int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; if (childNum > 0) { task->children = taosArrayInit(childNum, POINTER_BYTES); if (NULL == task->children) { qError("taosArrayInit %d failed", childNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } for (int32_t n = 0; n < childNum; ++n) { SSubplan *child = taosArrayGet(plan->pChildern, n); SQueryTask *childTask = taosHashGet(planToTask, &child, POINTER_BYTES); if (childTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } if (NULL == taosArrayPush(task->children, &childTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } if (parentNum > 0) { task->parents = taosArrayInit(parentNum, POINTER_BYTES); if (NULL == task->parents) { qError("taosArrayInit %d failed", parentNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } for (int32_t n = 0; n < parentNum; ++n) { SSubplan *parent = taosArrayGet(plan->pParents, n); SQueryTask *parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); if (parentTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } if (NULL == taosArrayPush(task->parents, &parentTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } } } SQueryLevel *level = taosArrayGet(job->levels, 0); if (level->taskNum > 1) { qError("invalid plan info, level 0, taskNum:%d", level->taskNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } SQueryTask *task = taosArrayGet(level->subTasks, 0); if (task->parents && taosArrayGetSize(task->parents) > 0) { qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents)); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } return TSDB_CODE_SUCCESS; } int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t code = 0; job->queryId = dag->queryId; if (dag->numOfSubplans <= 0) { qError("invalid subplan num:%d", dag->numOfSubplans); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); if (levelNum <= 0) { qError("invalid level num:%d", levelNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == planToTask) { qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); if (NULL == job->levels) { qError("taosArrayInit %d failed", levelNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } job->levelNum = levelNum; job->levelIdx = levelNum - 1; job->subPlans = dag->pSubplans; SQueryLevel level = {0}; SArray *levelPlans = NULL; int32_t levelPlanNum = 0; level.status = JOB_TASK_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { level.level = i; levelPlans = taosArrayGetP(dag->pSubplans, i); if (NULL == levelPlans) { qError("no level plans for level %d", i); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); if (levelPlanNum <= 0) { qError("invalid level plans number:%d, level:%d", levelPlanNum, i); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } level.taskNum = levelPlanNum; level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); if (NULL == level.subTasks) { qError("taosArrayInit %d failed", levelPlanNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } for (int32_t n = 0; n < levelPlanNum; ++n) { SSubplan *plan = taosArrayGet(levelPlans, n); SQueryTask task = {0}; task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.plan = plan; task.status = JOB_TASK_STATUS_NOT_START; void *p = taosArrayPush(level.subTasks, &task); if (NULL == p) { qError("taosArrayPush failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } if (NULL == taosArrayPush(job->levels, &level)) { qError("taosArrayPush failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); if (planToTask) { taosHashCleanup(planToTask); } return TSDB_CODE_SUCCESS; _return: if (level.subTasks) { taosArrayDestroy(level.subTasks); } if (planToTask) { taosHashCleanup(planToTask); } SCH_RET(code); } int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { return TSDB_CODE_SUCCESS; } int32_t qnodeNum = taosArrayGetSize(job->qnodeList); for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) { SEpAddr *addr = taosArrayGet(job->qnodeList, i); strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn)); epSet->port[epSet->numOfEps] = addr->port; ++epSet->numOfEps; } for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) { strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; ++epSet->numOfEps; } return TSDB_CODE_SUCCESS; } int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } return TSDB_CODE_SUCCESS; } int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); return TSDB_CODE_SUCCESS; } if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } *moved = true; return TSDB_CODE_SUCCESS; } int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { int32_t msgSize = 0; void *msg = NULL; switch (msgType) { case TSDB_MSG_TYPE_QUERY: { if (NULL == task->msg) { qError("query msg is NULL"); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } int32_t len = strlen(task->msg); msgSize = sizeof(SSchedulerQueryMsg) + len; msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SSchedulerQueryMsg *pMsg = msg; pMsg->schedulerId = htobe64(schMgmt.schedulerId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); pMsg->contentLen = htonl(len); memcpy(pMsg->msg, task->msg, len); break; } case TSDB_MSG_TYPE_RES_READY: { msgSize = sizeof(SSchedulerReadyMsg); msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SSchedulerReadyMsg *pMsg = msg; pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; } case TSDB_MSG_TYPE_FETCH: { msgSize = sizeof(SSchedulerFetchMsg); msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SSchedulerFetchMsg *pMsg = msg; pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; } default: qError("unknown msg type:%d", msgType); break; } //TODO SEND MSG return TSDB_CODE_SUCCESS; } int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info *needRetry = false; return TSDB_CODE_SUCCESS; } int32_t schFetchFromRemote(SQueryJob *job) { int32_t code = 0; if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { qInfo("prior fetching not finished"); return TSDB_CODE_SUCCESS; } SCH_ERR_JRET(schAsyncSendMsg(job, NULL, TSDB_MSG_TYPE_FETCH)); return TSDB_CODE_SUCCESS; _return: atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); return code; } int32_t schProcessOnJobSuccess(SQueryJob *job) { job->status = JOB_TASK_STATUS_SUCCEED; if (job->userFetch) { SCH_ERR_RET(schFetchFromRemote(job)); } return TSDB_CODE_SUCCESS; } int32_t schProcessOnJobFailure(SQueryJob *job) { job->status = JOB_TASK_STATUS_FAILED; atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); if (job->userFetch) { tsem_post(&job->rspSem); } return TSDB_CODE_SUCCESS; } int32_t schProcessOnDataFetched(SQueryJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); tsem_post(&job->rspSem); } int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { bool moved = false; SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } task->status = JOB_TASK_STATUS_SUCCEED; int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); if (parentNum == 0) { if (task->plan->level != 0) { qError("level error"); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); job->resEp.port = task->execAddr.port; SCH_ERR_RET(schProcessOnJobSuccess(job)); return TSDB_CODE_SUCCESS; } if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) { strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; ++job->dataSrcEps.numOfEps; } for (int32_t i = 0; i < parentNum; ++i) { SQueryTask *par = taosArrayGet(task->parents, i); ++par->childReady; SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { SCH_ERR_RET(schLaunchTask(job, task)); } } return TSDB_CODE_SUCCESS; } int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { bool needRetry = false; SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); if (!needRetry) { SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); job->status = JOB_TASK_STATUS_FAILED; SCH_ERR_RET(schProcessOnJobFailure(job)); return TSDB_CODE_SUCCESS; } SCH_ERR_RET(schLaunchTask(job, task)); return TSDB_CODE_SUCCESS; } int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { int32_t code = 0; switch (msgType) { case TSDB_MSG_TYPE_QUERY: if (rspCode != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); } else { code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY); if (code) { goto _task_error; } } break; case TSDB_MSG_TYPE_RES_READY: if (rspCode != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); } else { code = schProcessOnTaskSuccess(job, task); if (code) { goto _task_error; } } break; case TSDB_MSG_TYPE_FETCH: SCH_ERR_JRET(rspCode); SCH_ERR_JRET(schProcessOnDataFetched(job)); break; default: qError("unknown msg type:%d received", msgType); return TSDB_CODE_QRY_INVALID_INPUT; } return TSDB_CODE_SUCCESS; _task_error: SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code)); return TSDB_CODE_SUCCESS; _return: code = schProcessOnJobFailure(job); return code; } int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SSubplan *plan = task->plan; int32_t len = 0; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &len)); if (plan->execEpSet.numOfEps <= 0) { SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet)); } if (plan->execEpSet.numOfEps <= 0) { SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY)); SCH_ERR_RET(schPushTaskToExecList(job, task)); task->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; } int32_t schLaunchJob(SQueryJob *job) { SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { SQueryTask *task = taosArrayGet(level->subTasks, i); SCH_ERR_RET(schLaunchTask(job, task)); } job->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; } int32_t schedulerInit(SSchedulerCfg *cfg) { if (cfg) { schMgmt.cfg = *cfg; } else { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; } schMgmt.Jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == schMgmt.Jobs) { SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); } schMgmt.schedulerId = 1; //TODO GENERATE A UUID return TSDB_CODE_SUCCESS; } int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } if (taosArrayGetSize(qnodeList) <= 0) { qInfo("qnodeList is empty"); } int32_t code = 0; SQueryJob *job = calloc(1, sizeof(SQueryJob)); if (NULL == job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } job->transport = transport; job->qnodeList = qnodeList; SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == job->execTasks) { qError("taosHashInit %d failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == job->succTasks) { qError("taosHashInit %d failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } tsem_init(&job->rspSem, 0, 0); if (0 != taosHashPut(schMgmt.Jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } job->status = JOB_TASK_STATUS_NOT_START; SCH_ERR_JRET(schLaunchJob(job)); *(SQueryJob **)pJob = job; return TSDB_CODE_SUCCESS; _return: *(SQueryJob **)pJob = NULL; scheduleFreeJob(job); SCH_RET(code); } int32_t scheduleFetchRows(void *pJob, void **data) { if (NULL == pJob || NULL == data) { return TSDB_CODE_QRY_INVALID_INPUT; } SQueryJob *job = pJob; int32_t code = 0; if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { qError("prior fetching not finished"); return TSDB_CODE_QRY_APP_ERROR; } if (job->status == JOB_TASK_STATUS_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(job)); } tsem_wait(&job->rspSem); *data = job->res; job->res = NULL; _return: atomic_val_compare_exchange_32(&job->userFetch, 1, 0); return code; } int32_t scheduleCancelJob(void *pJob) { //TODO return TSDB_CODE_SUCCESS; } void scheduleFreeJob(void *pJob) { if (NULL == pJob) { return; } SQueryJob *job = pJob; if (job->status > 0) { if (0 != taosHashRemove(schMgmt.Jobs, &job->queryId, sizeof(job->queryId))) { qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed return; } if (job->status == JOB_TASK_STATUS_EXECUTING) { scheduleCancelJob(pJob); } } //TODO free job } void schedulerDestroy(void) { if (schMgmt.Jobs) { taosHashCleanup(schMgmt.Jobs); //TODO schMgmt.Jobs = NULL; } }