From eced27c4f2297e8781c95c23a0e466c746a0e9e7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 15 Mar 2022 15:54:32 +0800 Subject: [PATCH] add uuid --- include/util/tuuid.h | 39 ++ source/dnode/snode/inc/sndInt.h | 14 +- source/dnode/snode/src/snode.c | 11 + source/libs/scheduler/src/scheduler.c | 673 +++++++++++++------------- source/util/src/tuuid.c | 59 +++ 5 files changed, 447 insertions(+), 349 deletions(-) create mode 100644 include/util/tuuid.h create mode 100644 source/util/src/tuuid.c diff --git a/include/util/tuuid.h b/include/util/tuuid.h new file mode 100644 index 0000000000..315c2ad497 --- /dev/null +++ b/include/util/tuuid.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "os.h" +#include "taoserror.h" +#include "thash.h" + +/** + * Generate an non-negative signed 32bit id + *+------------+-----+-----------+---------------+ + *| uid|localIp| PId | timestamp | serial number | + *+------------+-----+-----------+---------------+ + *| 6bit |6bit | 12bit | 8bit | + *+------------+-----+-----------+---------------+ + * @return + */ +int32_t tGenIdPI32(void); + +/** + * Generate an non-negative signed 64bit id + *+------------+-----+-----------+---------------+ + *| uid|localIp| PId | timestamp | serial number | + *+------------+-----+-----------+---------------+ + *| 12bit |12bit|24bit |16bit | + *+------------+-----+-----------+---------------+ + * @return + */ +int64_t tGenIdPI64(void); diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 5c792c840d..ffe691aeb4 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -54,18 +54,18 @@ typedef struct SSnode { typedef struct { int64_t streamId; + int32_t taskId; int32_t IdxInLevel; int32_t level; -} SStreamInfo; +} SStreamTaskInfo; typedef struct { - SStreamInfo meta; - int8_t status; - void* executor; - STaosQueue* queue; - void* stateStore; + SStreamTaskInfo meta; + int8_t status; + void* executor; + void* stateStore; // storage handle -} SStreamRunner; +} SStreamTask; int32_t sndCreateStream(); int32_t sndDropStream(); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 91008dd03a..74e41d45c5 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -14,6 +14,7 @@ */ #include "sndInt.h" +#include "tuuid.h" SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *pSnode = calloc(1, sizeof(SSnode)); @@ -32,6 +33,16 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { void sndDestroy(const char *path) {} +static int32_t sndDeployTask(SSnode *pSnode, SRpcMsg *pMsg) { + SStreamTask *task = malloc(sizeof(SStreamTask)); + if (task == NULL) { + return -1; + } + task->meta.taskId = tGenIdPI32(); + taosHashPut(pSnode->pMeta->pHash, &task->meta.taskId, sizeof(int32_t), &task, sizeof(void *)); + return 0; +} + int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { // stream deployment // stream stop/resume diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index ebe70ca401..6b1ca25d93 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -13,26 +13,21 @@ * along with this program. If not, see . */ +#include "catalog.h" +#include "query.h" #include "schedulerInt.h" #include "tmsg.h" -#include "query.h" -#include "catalog.h" #include "tref.h" SSchedulerMgmt schMgmt = {0}; -FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { - return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); -} +FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } -FORCE_INLINE int32_t schReleaseJob(int64_t refId) { - return taosReleaseRef(schMgmt.jobRef, refId); -} +FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); } -uint64_t schGenTaskId(void) { - return atomic_add_fetch_64(&schMgmt.taskId, 1); -} +uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); } +#if 0 uint64_t schGenUUID(void) { static uint64_t hashId = 0; static int32_t requestSerialId = 0; @@ -54,11 +49,11 @@ uint64_t schGenUUID(void) { uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); return id; } +#endif - -int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { - pTask->plan = pPlan; - pTask->level = pLevel; +int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { + pTask->plan = pPlan; + pTask->level = pLevel; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); pTask->taskId = schGenTaskId(); pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); @@ -70,7 +65,7 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel * return TSDB_CODE_SUCCESS; } -void schFreeTask(SSchTask* pTask) { +void schFreeTask(SSchTask *pTask) { if (pTask->candidateAddrs) { taosArrayDestroy(pTask->candidateAddrs); } @@ -90,22 +85,20 @@ void schFreeTask(SSchTask* pTask) { } } - static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { int8_t status = SCH_GET_JOB_STATUS(pJob); if (pStatus) { *pStatus = status; } - return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED - || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING - || status == JOB_TASK_STATUS_SUCCEED); + return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || + status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING || + status == JOB_TASK_STATUS_SUCCEED); } - int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask); - + switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_SUBMIT_RSP: @@ -114,19 +107,22 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m case TDMT_VND_FETCH_RSP: case TDMT_VND_DROP_TASK: if (lastMsgType != (msgType - 1)) { - SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); + SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), + TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%s", SCH_GET_TASK_STATUS(pTask), TMSG_INFO(msgType)); + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && + SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%s", SCH_GET_TASK_STATUS(pTask), + TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } break; default: SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%d", TMSG_INFO(msgType), SCH_GET_TASK_STATUS(pTask)); - + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -135,7 +131,6 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m return TSDB_CODE_SUCCESS; } - int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { int32_t code = 0; @@ -147,37 +142,34 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { if (oriStatus == newStatus) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + switch (oriStatus) { case JOB_TASK_STATUS_NULL: if (newStatus != JOB_TASK_STATUS_NOT_START) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_NOT_START: if (newStatus != JOB_TASK_STATUS_EXECUTING) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_EXECUTING: - if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED - && newStatus != JOB_TASK_STATUS_FAILED - && newStatus != JOB_TASK_STATUS_CANCELLING - && newStatus != JOB_TASK_STATUS_CANCELLED - && newStatus != JOB_TASK_STATUS_DROPPING) { + if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED && + newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED && + newStatus != JOB_TASK_STATUS_DROPPING) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_PARTIAL_SUCCEED: - if (newStatus != JOB_TASK_STATUS_FAILED - && newStatus != JOB_TASK_STATUS_SUCCEED - && newStatus != JOB_TASK_STATUS_DROPPING) { + if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED && + newStatus != JOB_TASK_STATUS_DROPPING) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_SUCCEED: case JOB_TASK_STATUS_FAILED: @@ -185,13 +177,13 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { if (newStatus != JOB_TASK_STATUS_DROPPING) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_CANCELLED: case JOB_TASK_STATUS_DROPPING: SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); break; - + default: SCH_JOB_ELOG("invalid job status:%d", oriStatus); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); @@ -211,27 +203,26 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { _return: SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus); - + SCH_ERR_RET(code); } - int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t i = 0; i < pJob->levelNum; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - + for (int32_t m = 0; m < pLevel->taskNum; ++m) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); SSubplan *pPlan = pTask->plan; - int32_t childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0; - int32_t parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0; + int32_t childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0; + int32_t parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0; if (childNum > 0) { if (pJob->levelIdx == pLevel->level) { SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - + pTask->children = taosArrayInit(childNum, POINTER_BYTES); if (NULL == pTask->children) { SCH_TASK_ELOG("taosArrayInit %d children failed", childNum); @@ -240,7 +231,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } for (int32_t n = 0; n < childNum; ++n) { - SSubplan *child = (SSubplan*)nodesListGetNode(pPlan->pChildren, n); + SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n); SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES); if (NULL == childTask || NULL == *childTask) { SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); @@ -258,7 +249,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - + pTask->parents = taosArrayInit(parentNum, POINTER_BYTES); if (NULL == pTask->parents) { SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum); @@ -272,7 +263,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } for (int32_t n = 0; n < parentNum; ++n) { - SSubplan *parent = (SSubplan*)nodesListGetNode(pPlan->pParents, n); + SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n); SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); if (NULL == parentTask || NULL == *parentTask) { SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); @@ -283,7 +274,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - } + } SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum); } @@ -298,11 +289,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { return TSDB_CODE_SUCCESS; } - int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) { - SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx, + (int32_t)taosArrayGetSize(pTask->candidateAddrs)); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -311,7 +302,6 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr) { if (NULL == taosArrayPush(pTask->execAddrs, addr)) { SCH_TASK_ELOG("taosArrayPush addr to execAddr list failed, errno:%d", errno); @@ -321,23 +311,25 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad return TSDB_CODE_SUCCESS; } - int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { int32_t code = 0; pJob->queryId = pDag->queryId; - + if (pDag->numOfSubplans <= 0) { SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans); if (levelNum <= 0) { SCH_JOB_ELOG("invalid level num:%d", levelNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_MAX_TASK_NUM, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + SHashObj *planToTask = taosHashInit( + SCHEDULE_DEFAULT_MAX_TASK_NUM, + taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, + HASH_NO_LOCK); if (NULL == planToTask) { SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -354,10 +346,10 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { pJob->subPlans = pDag->pSubplans; - SSchLevel level = {0}; + SSchLevel level = {0}; SNodeListNode *plans = NULL; - int32_t taskNum = 0; - SSchLevel *pLevel = NULL; + int32_t taskNum = 0; + SSchLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; @@ -369,8 +361,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { pLevel = taosArrayGet(pJob->levels, i); pLevel->level = i; - - plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, i); + + plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i); if (NULL == plans) { SCH_JOB_ELOG("empty level plan, level:%d", i); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); @@ -383,15 +375,15 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { } pLevel->taskNum = taskNum; - + pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask)); if (NULL == pLevel->subTasks) { SCH_JOB_ELOG("taosArrayInit %d failed", taskNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + for (int32_t n = 0; n < taskNum; ++n) { - SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, n); + SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); SCH_SET_JOB_TYPE(pJob, plan->subplanType); @@ -399,13 +391,13 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SSchTask *pTask = &task; SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel)); - + void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -452,10 +444,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { int32_t nodeNum = 0; if (pJob->nodeList) { nodeNum = taosArrayGetSize(pJob->nodeList); - + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); - + if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -470,14 +462,14 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_QRY_INVALID_INPUT; } -/* - for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { - strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); - epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; - - ++epSet->numOfEps; - } -*/ + /* + for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { + strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); + epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; + + ++epSet->numOfEps; + } + */ return TSDB_CODE_SUCCESS; } @@ -489,7 +481,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_ELOG("task already in execTask list, code:%x", code); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - + SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -510,11 +502,11 @@ int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != code) { if (HASH_NODE_EXIST(code)) { *moved = true; - + SCH_TASK_ELOG("task already in succTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - + SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -522,13 +514,13 @@ int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { *moved = true; SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks)); - + return TSDB_CODE_SUCCESS; } int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { *moved = false; - + if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); } @@ -537,11 +529,11 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != code) { if (HASH_NODE_EXIST(code)) { *moved = true; - + SCH_TASK_WLOG("task already in failTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - + SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -549,11 +541,10 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { *moved = true; SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks)); - + return TSDB_CODE_SUCCESS; } - int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) { SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); @@ -563,11 +554,11 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != code) { if (HASH_NODE_EXIST(code)) { *moved = true; - + SCH_TASK_ELOG("task already in execTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - + SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -575,11 +566,10 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { *moved = true; SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); - + return TSDB_CODE_SUCCESS; } - int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... @@ -587,20 +577,17 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; - //TODO CHECK epList/condidateList + // TODO CHECK epList/condidateList if (SCH_IS_DATA_SRC_TASK(pTask)) { - } else { int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); - + if ((pTask->candidateIdx + 1) >= candidateNum) { return TSDB_CODE_SUCCESS; } ++pTask->candidateIdx; } - - } int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { @@ -623,9 +610,9 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { } int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) { - int32_t code = 0; + int32_t code = 0; SSchHbTrans *hb = NULL; - + while (true) { hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); if (NULL == hb) { @@ -639,9 +626,11 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) { SCH_ERR_RET(code); } - qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p", - trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, trans->trans.transHandle); - + qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 + ", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p", + trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, + trans->trans.transHandle); + return TSDB_CODE_SUCCESS; } @@ -649,11 +638,11 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) { } SCH_LOCK(SCH_WRITE, &hb->lock); - + if (hb->seqId >= trans->seqId) { - qDebug("hb trans seqId is old, seqId:%" PRId64 ", currentId:%" PRId64 ", nodeId:%d, fqdn:%s, port:%d", - trans->seqId, hb->seqId, epId->nodeId, epId->ep.fqdn, epId->ep.port); - + qDebug("hb trans seqId is old, seqId:%" PRId64 ", currentId:%" PRId64 ", nodeId:%d, fqdn:%s, port:%d", trans->seqId, + hb->seqId, epId->nodeId, epId->ep.fqdn, epId->ep.port); + SCH_UNLOCK(SCH_WRITE, &hb->lock); return TSDB_CODE_SUCCESS; } @@ -663,16 +652,18 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) { SCH_UNLOCK(SCH_WRITE, &hb->lock); - qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p", - trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, trans->trans.transHandle); - + qDebug("hb connection updated, seqId:%" PRIx64 ", sId:%" PRIx64 + ", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p", + trans->seqId, schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->trans.transInst, + trans->trans.transHandle); + return TSDB_CODE_SUCCESS; } int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { // if already FAILED, no more processing SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status)); - + if (errCode) { atomic_store_32(&pJob->errCode, errCode); } @@ -684,11 +675,10 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod int32_t code = atomic_load_32(&pJob->errCode); SCH_JOB_DLOG("job failed with error: %s", tstrerror(code)); - + SCH_RET(code); } - // Note: no more task error processing, handled in function internal int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode)); @@ -699,18 +689,16 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode)); } - - // Note: no more task error processing, handled in function internal int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { int32_t code = 0; - + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED)); if (pJob->attr.syncSchedule) { tsem_post(&pJob->rspSem); } - + if (atomic_load_8(&pJob->userFetch)) { SCH_ERR_JRET(schFetchFromRemote(pJob)); } @@ -730,22 +718,22 @@ int32_t schProcessOnDataFetched(SSchJob *job) { // Note: no more task error processing, handled in function internal int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { int8_t status = 0; - + if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status); - + SCH_RET(atomic_load_32(&pJob->errCode)); } - bool needRetry = false; - bool moved = false; + bool needRetry = false; + bool moved = false; int32_t taskDone = 0; int32_t code = 0; SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode)); - + SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry)); - + if (!needRetry) { SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); @@ -757,7 +745,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); - + if (SCH_TASK_NEED_WAIT_ALL(pTask)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskFailed++; @@ -765,7 +753,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); atomic_store_32(&pJob->errCode, errCode); - + if (taskDone < pTask->level->taskNum) { SCH_TASK_DLOG("not all tasks done, done:%d, all:%d", taskDone, pTask->level->taskNum); SCH_ERR_RET(errCode); @@ -773,7 +761,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } } else { SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask)); - + return TSDB_CODE_SUCCESS; } @@ -784,7 +772,7 @@ _return: // Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { - bool moved = false; + bool moved = false; int32_t code = 0; SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask)); @@ -796,17 +784,17 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask)); SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); - + int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { int32_t taskDone = 0; - + if (SCH_TASK_NEED_WAIT_ALL(pTask)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskSucceed++; taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); - + if (taskDone < pTask->level->taskNum) { SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); return TSDB_CODE_SUCCESS; @@ -826,28 +814,31 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { pJob->fetchTask = pTask; SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved)); - + SCH_RET(schProcessOnJobPartialSuccess(pJob)); } -/* - if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) { - strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); - job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; + /* + if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) { + strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); + job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; - ++job->dataSrcEps.numOfEps; - } -*/ + ++job->dataSrcEps.numOfEps; + } + */ for (int32_t i = 0; i < parentNum; ++i) { SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); - int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1); + int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1); SCH_LOCK(SCH_WRITE, &par->lock); - SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, .taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr}; + SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, + .taskId = pTask->taskId, + .schedId = schMgmt.sId, + .addr = pTask->succeedAddr}; qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK(SCH_WRITE, &par->lock); - + if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) { SCH_ERR_RET(schLaunchTaskImpl(pJob, par)); } @@ -860,11 +851,10 @@ _return: SCH_RET(schProcessOnJobFailure(pJob, code)); } - // Note: no more error processing, handled in function internal int32_t schFetchFromRemote(SSchJob *pJob) { int32_t code = 0; - + if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) { SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch)); return TSDB_CODE_SUCCESS; @@ -881,7 +871,7 @@ int32_t schFetchFromRemote(SSchJob *pJob) { SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); return TSDB_CODE_SUCCESS; - + _return: atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); @@ -889,15 +879,15 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); } - // Note: no more task error processing, handled in function internal -int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { +int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, + int32_t rspCode) { int32_t code = 0; - int8_t status = 0; - + int8_t status = 0; + if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status); - + SCH_RET(atomic_load_32(&pJob->errCode)); } @@ -905,13 +895,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { - SCH_ERR_JRET(rspCode); - SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + SCH_ERR_JRET(rspCode); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); - break; - } + break; + } case TDMT_VND_SUBMIT_RSP: { - #if 0 //TODO OPEN THIS +#if 0 // TODO OPEN THIS SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { @@ -919,77 +909,77 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } pJob->resNumOfRows += rsp->affectedRows; - #else - SCH_ERR_JRET(rspCode); +#else + SCH_ERR_JRET(rspCode); - SSubmitRsp *rsp = (SSubmitRsp *)msg; - if (rsp) { - pJob->resNumOfRows += rsp->affectedRows; - } - #endif + SSubmitRsp *rsp = (SSubmitRsp *)msg; + if (rsp) { + pJob->resNumOfRows += rsp->affectedRows; + } +#endif - SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); - break; - } + break; + } case TDMT_VND_QUERY_RSP: { - SQueryTableRsp *rsp = (SQueryTableRsp *)msg; - - SCH_ERR_JRET(rspCode); - if (NULL == msg) { - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - SCH_ERR_JRET(rsp->code); - - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); - - break; + SQueryTableRsp *rsp = (SQueryTableRsp *)msg; + + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + SCH_ERR_JRET(rsp->code); + + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); + + break; + } case TDMT_VND_RES_READY_RSP: { - SResReadyRsp *rsp = (SResReadyRsp *)msg; - - SCH_ERR_JRET(rspCode); - if (NULL == msg) { - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - SCH_ERR_JRET(rsp->code); - - SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); - - break; + SResReadyRsp *rsp = (SResReadyRsp *)msg; + + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - case TDMT_VND_FETCH_RSP: { - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; + SCH_ERR_JRET(rsp->code); - SCH_ERR_JRET(rspCode); - if (NULL == msg) { - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - if (pJob->res) { - SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); - tfree(rsp); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + + break; + } + case TDMT_VND_FETCH_RSP: { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - atomic_store_ptr(&pJob->res, rsp); - atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } - if (rsp->completed) { - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); - } + if (pJob->res) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); + tfree(rsp); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } - SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); + atomic_store_ptr(&pJob->res, rsp); + atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); - schProcessOnDataFetched(pJob); - break; + if (rsp->completed) { + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); } + + SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); + + schProcessOnDataFetched(pJob); + break; + } case TDMT_VND_DROP_TASK_RSP: { - // SHOULD NEVER REACH HERE - SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); - break; - } + // SHOULD NEVER REACH HERE + SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:%" PRIx64, pJob->refId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + break; + } default: SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1002,15 +992,15 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } - -int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { - int32_t code = 0; +int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) { + int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; - SSchTask *pTask = NULL; - + SSchTask *pTask = NULL; + SSchJob *pJob = schAcquireJob(pParam->refId); if (NULL == pJob) { - qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64, pParam->queryId, pParam->taskId, pParam->refId); + qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64, + pParam->queryId, pParam->taskId, pParam->refId); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } @@ -1028,8 +1018,8 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in pTask = *task; SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode)); - - pTask->handle = pMsg->handle; + + pTask->handle = pMsg->handle; SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: @@ -1042,42 +1032,41 @@ _return: SCH_RET(code); } -int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); } -int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code); } -int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); } -int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); } -int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code); } -int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) { SSchCallbackParam *pParam = (SSchCallbackParam *)param; - qDebug("QID:%"PRIx64",TID:%"PRIx64" drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code); + qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code); } - -int32_t schHandleHbCallback(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { if (code) { qError("hb rsp error:%s", tstrerror(code)); SCH_ERR_RET(code); } - + SSchedulerHbRsp rsp = {0}; SSchCallbackParam *pParam = (SSchCallbackParam *)param; - + if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) { qError("invalid hb rsp msg, size:%d", pMsg->len); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1088,21 +1077,22 @@ int32_t schHandleHbCallback(void* param, const SDataBuf* pMsg, int32_t code) { trans.seqId = rsp.seqId; trans.trans.transInst = pParam->transport; trans.trans.transHandle = pMsg->handle; - + SCH_RET(schUpdateHbConnection(&rsp.epId, &trans)); } int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus); for (int32_t i = 0; i < taskNum; ++i) { STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i); - + SSchJob *pJob = schAcquireJob(taskStatus->refId); if (NULL == pJob) { - qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId, taskStatus->queryId, taskStatus->taskId); - //TODO DROP TASK FROM SERVER!!!! + qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId, + taskStatus->queryId, taskStatus->taskId); + // TODO DROP TASK FROM SERVER!!!! continue; } - + // TODO schReleaseJob(taskStatus->refId); @@ -1115,22 +1105,21 @@ _return: SCH_RET(code); } - int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { switch (msgType) { case TDMT_VND_CREATE_TABLE: *fp = schHandleCreateTableCallback; break; - case TDMT_VND_SUBMIT: + case TDMT_VND_SUBMIT: *fp = schHandleSubmitCallback; break; - case TDMT_VND_QUERY: + case TDMT_VND_QUERY: *fp = schHandleQueryCallback; break; - case TDMT_VND_RES_READY: + case TDMT_VND_RES_READY: *fp = schHandleReadyCallback; break; - case TDMT_VND_FETCH: + case TDMT_VND_FETCH: *fp = schHandleFetchCallback; break; case TDMT_VND_DROP_TASK: @@ -1147,13 +1136,13 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { return TSDB_CODE_SUCCESS; } - -int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* epSet, int32_t msgType, void *msg, uint32_t msgSize) { +int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet *epSet, int32_t msgType, void *msg, + uint32_t msgSize) { int32_t code = 0; SSchTrans *trans = (SSchTrans *)transport; - SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1173,15 +1162,14 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* param->taskId = SCH_TASK_ID(pTask); param->transport = trans->transInst; - pMsgSendInfo->param = param; pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; - pMsgSendInfo->msgInfo.handle = trans->transHandle; + pMsgSendInfo->msgInfo.handle = trans->transHandle; pMsgSendInfo->msgType = msgType; pMsgSendInfo->fp = fp; - - int64_t transporterId = 0; + + int64_t transporterId = 0; code = asyncSendMsgToServer(trans->transInst, epSet, &transporterId, pMsgSendInfo); if (code) { SCH_ERR_JRET(code); @@ -1191,7 +1179,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* return TSDB_CODE_SUCCESS; _return: - + tfree(param); tfree(pMsgSendInfo); SCH_RET(code); @@ -1199,9 +1187,9 @@ _return: int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) { uint32_t msgSize = 0; - void *msg = NULL; - int32_t code = 0; - bool isCandidateAddr = false; + void *msg = NULL; + int32_t code = 0; + bool isCandidateAddr = false; if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); isCandidateAddr = true; @@ -1235,13 +1223,13 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SSubQueryMsg *pMsg = msg; pMsg->header.vgId = htonl(addr->nodeId); - pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); - pMsg->refId = htobe64(pJob->refId); - pMsg->taskType = TASK_TYPE_TEMP; - pMsg->phyLen = htonl(pTask->msgLen); - pMsg->sqlLen = htonl(len); + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(pJob->queryId); + pMsg->taskId = htobe64(pTask->taskId); + pMsg->refId = htobe64(pJob->refId); + pMsg->taskType = TASK_TYPE_TEMP; + pMsg->phyLen = htonl(pTask->msgLen); + pMsg->sqlLen = htonl(len); memcpy(pMsg->msg, pJob->sql, len); memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); @@ -1257,12 +1245,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, } SResReadyReq *pMsg = msg; - - pMsg->header.vgId = htonl(addr->nodeId); - - pMsg->sId = htobe64(schMgmt.sId); + + pMsg->header.vgId = htonl(addr->nodeId); + + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); + pMsg->taskId = htobe64(pTask->taskId); break; } case TDMT_VND_FETCH: { @@ -1272,32 +1260,32 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + SResFetchReq *pMsg = msg; - - pMsg->header.vgId = htonl(addr->nodeId); - - pMsg->sId = htobe64(schMgmt.sId); + + pMsg->header.vgId = htonl(addr->nodeId); + + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); + pMsg->taskId = htobe64(pTask->taskId); break; } - case TDMT_VND_DROP_TASK:{ + case TDMT_VND_DROP_TASK: { msgSize = sizeof(STaskDropReq); msg = calloc(1, msgSize); if (NULL == msg) { SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + STaskDropReq *pMsg = msg; - - pMsg->header.vgId = htonl(addr->nodeId); - - pMsg->sId = htobe64(schMgmt.sId); + + pMsg->header.vgId = htonl(addr->nodeId); + + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); - pMsg->refId = htobe64(pJob->refId); + pMsg->taskId = htobe64(pTask->taskId); + pMsg->refId = htobe64(pJob->refId); break; } case TDMT_VND_QUERY_HEARTBEAT: { @@ -1337,24 +1325,24 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, if (isCandidateAddr) { SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); } - + return TSDB_CODE_SUCCESS; _return: SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); - + tfree(msg); SCH_RET(code); } int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); - SQueryNodeEpId epId = {0}; + SQueryNodeEpId epId = {0}; epId.nodeId = addr->nodeId; memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); - + SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); if (NULL == hb) { SCH_ERR_RET(schBuildAndSendMsg(pJob, NULL, addr, TDMT_VND_QUERY_HEARTBEAT)); @@ -1364,29 +1352,30 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { } int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { - int8_t status = 0; + int8_t status = 0; int32_t code = 0; atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); - + if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("no need to launch task cause of job status, job status:%d", status); - + SCH_RET(atomic_load_32(&pJob->errCode)); } - + SSubplan *plan = pTask->plan; - if (NULL == pTask->msg) { // TODO add more detailed reason for failure + if (NULL == pTask->msg) { // TODO add more detailed reason for failure code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); if (TSDB_CODE_SUCCESS != code) { - SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); + SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, + pTask->msgLen); SCH_ERR_RET(code); } else { SCH_TASK_DLOG("physical plan len:%d, %s", pTask->msgLen, pTask->msg); } } - + SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); // NOTE: race condition: the task should be put into the hash table before send msg to server @@ -1398,15 +1387,15 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { if (SCH_IS_QUERY_JOB(pJob)) { SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); } - + SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); - + return TSDB_CODE_SUCCESS; } // Note: no more error processing, handled in function internal int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { - bool enough = false; + bool enough = false; int32_t code = 0; if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { @@ -1436,11 +1425,9 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { return TSDB_CODE_SUCCESS; } - - int32_t schLaunchJob(SSchJob *pJob) { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); - + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING)); SCH_ERR_RET(schCheckJobNeedFlowCtrl(pJob, level)); @@ -1457,7 +1444,7 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { } int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs); - + if (size <= 0) { SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%d", SCH_GET_TASK_STATUS(pTask)); return; @@ -1481,9 +1468,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { if (!SCH_TASK_NO_NEED_DROP(pTask)) { schDropTaskOnExecutedNode(pJob, pTask); } - + pIter = taosHashIterate(list, pIter); - } + } } void schDropJobAllTasks(SSchJob *pJob) { @@ -1493,10 +1480,9 @@ void schDropJobAllTasks(SSchJob *pJob) { } int32_t schCancelJob(SSchJob *pJob) { - //TODO - - //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST + // TODO + // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST } void schFreeJobImpl(void *job) { @@ -1506,7 +1492,7 @@ void schFreeJobImpl(void *job) { SSchJob *pJob = job; uint64_t queryId = pJob->queryId; - int64_t refId = pJob->refId; + int64_t refId = pJob->refId; if (pJob->status == JOB_TASK_STATUS_EXECUTING) { schCancelJob(pJob); @@ -1514,55 +1500,55 @@ void schFreeJobImpl(void *job) { schDropJobAllTasks(pJob); - pJob->subPlans = NULL; // it is a reference to pDag->pSubplans - + pJob->subPlans = NULL; // it is a reference to pDag->pSubplans + int32_t numOfLevels = taosArrayGetSize(pJob->levels); - for(int32_t i = 0; i < numOfLevels; ++i) { + for (int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); schFreeFlowCtrl(pLevel); - + int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); - for(int32_t j = 0; j < numOfTasks; ++j) { - SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); + for (int32_t j = 0; j < numOfTasks; ++j) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); schFreeTask(pTask); } taosArrayDestroy(pLevel->subTasks); } - + taosHashCleanup(pJob->execTasks); taosHashCleanup(pJob->failTasks); taosHashCleanup(pJob->succTasks); - + taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->nodeList); tfree(pJob->res); - + tfree(pJob); - qDebug("QID:0x%"PRIx64" job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob); + qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob); } - -static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pDag, int64_t *job, const char* sql, bool syncSchedule) { - qDebug("QID:0x%"PRIx64" job started", pDag->queryId); +static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + bool syncSchedule) { + qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) { - qDebug("QID:0x%"PRIx64" input exec nodeList is empty", pDag->queryId); + qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId); } - int32_t code = 0; + int32_t code = 0; SSchJob *pJob = calloc(1, sizeof(SSchJob)); if (NULL == pJob) { - qError("QID:%"PRIx64" calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); + qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } pJob->attr.syncSchedule = syncSchedule; pJob->transport = transport; - pJob->sql = sql; + pJob->sql = sql; if (pNodeList != NULL) { pJob->nodeList = taosArrayDup(pNodeList); @@ -1570,19 +1556,22 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); - pJob->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + pJob->execTasks = + taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == pJob->execTasks) { SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + pJob->succTasks = + taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == pJob->succTasks) { SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + pJob->failTasks = + taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == pJob->failTasks) { SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1602,9 +1591,9 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD SCH_ERR_JRET(schLaunchJob(pJob)); schAcquireJob(pJob->refId); - + *job = pJob->refId; - + if (syncSchedule) { SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob)); tsem_wait(&pJob->rspSem); @@ -1613,7 +1602,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob)); schReleaseJob(pJob->refId); - + return TSDB_CODE_SUCCESS; _return: @@ -1622,7 +1611,6 @@ _return: SCH_RET(code); } - int32_t schedulerInit(SSchedulerCfg *cfg) { if (schMgmt.jobRef) { qError("scheduler already initialized"); @@ -1631,7 +1619,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { if (cfg) { schMgmt.cfg = *cfg; - + if (schMgmt.cfg.maxJobNum == 0) { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; } @@ -1642,7 +1630,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_MAX_JOB_NUM; schMgmt.cfg.maxNodeTableNum = SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM; } - + schMgmt.jobRef = taosOpenRef(schMgmt.cfg.maxJobNum, schFreeJobImpl); if (schMgmt.jobRef < 0) { qError("init schduler jobRef failed, num:%u", schMgmt.cfg.maxJobNum); @@ -1660,12 +1648,13 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR); } - qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum); - + qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum); + return TSDB_CODE_SUCCESS; } -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, int64_t *pJob, const char* sql, SQueryResult *pRes) { +int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1676,20 +1665,21 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; schReleaseJob(*pJob); - + return TSDB_CODE_SUCCESS; } -int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pDag, const char* sql, int64_t *pJob) { +int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false)); - + return TSDB_CODE_SUCCESS; } +#if 0 int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) { if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1810,14 +1800,14 @@ _return: SCH_RET(code); } +#endif - -int32_t schedulerFetchRows(int64_t job, void** pData) { +int32_t schedulerFetchRows(int64_t job, void **pData) { if (NULL == pData) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t code = 0; + int32_t code = 0; SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); @@ -1861,12 +1851,11 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { SCH_JOB_ELOG("job failed or dropping, status:%d", status); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } - + if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) { SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); } - while (true) { *pData = atomic_load_ptr(&pJob->res); if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) { @@ -1891,7 +1880,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { _return: atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); - + schReleaseJob(job); SCH_RET(code); @@ -1944,17 +1933,17 @@ void schedulerFreeTaskList(SArray *taskList) { taosArrayDestroy(taskList); } - + void schedulerDestroy(void) { if (schMgmt.jobRef) { SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); - + while (pJob) { taosRemoveRef(schMgmt.jobRef, pJob->refId); - + pJob = taosIterateRef(schMgmt.jobRef, pJob->refId); } - + taosCloseRef(schMgmt.jobRef); schMgmt.jobRef = 0; } diff --git a/source/util/src/tuuid.c b/source/util/src/tuuid.c new file mode 100644 index 0000000000..0405403220 --- /dev/null +++ b/source/util/src/tuuid.c @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tuuid.h" + +static int64_t hashId = 0; +static int32_t SerialNo = 0; + +int32_t tGenIdPI32(void) { + if (hashId == 0) { + char uid[64]; + int32_t code = taosGetSystemUUID(uid, tListLen(uid)); + if (code != TSDB_CODE_SUCCESS) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } else { + hashId = MurmurHash3_32(uid, strlen(uid)); + } + } + + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&SerialNo, 1); + + int32_t id = ((hashId & 0x1F) << 26) | ((pid & 0x3F) << 20) | ((ts & 0xFFF) << 8) | (val & 0xFF); + return id; +} + +int64_t tGenIdPI64(void) { + if (hashId == 0) { + char uid[64]; + int32_t code = taosGetSystemUUID(uid, tListLen(uid)); + if (code != TSDB_CODE_SUCCESS) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } else { + hashId = MurmurHash3_32(uid, strlen(uid)); + } + } + + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&SerialNo, 1); + + int64_t id = ((hashId & 0x07FF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + return id; +} -- GitLab