#include "dataSinkMgt.h" #include "executor.h" #include "planner.h" #include "query.h" #include "qwInt.h" #include "qwMsg.h" #include "qworker.h" #include "tcommon.h" #include "tmsg.h" #include "tname.h" char *qwPhaseStr(int32_t phase) { switch (phase) { case QW_PHASE_PRE_QUERY: return "PRE_QUERY"; case QW_PHASE_POST_QUERY: return "POST_QUERY"; case QW_PHASE_PRE_FETCH: return "PRE_FETCH"; case QW_PHASE_POST_FETCH: return "POST_FETCH"; case QW_PHASE_PRE_CQUERY: return "PRE_CQUERY"; case QW_PHASE_POST_CQUERY: return "POST_CQUERY"; default: break; } return "UNKNOWN"; } char *qwBufStatusStr(int32_t bufStatus) { switch (bufStatus) { case DS_BUF_LOW: return "LOW"; case DS_BUF_FULL: return "FULL"; case DS_BUF_EMPTY: return "EMPTY"; default: break; } return "UNKNOWN"; } int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { int32_t code = 0; int8_t origStatus = 0; bool ignore = false; while (true) { origStatus = atomic_load_8(&task->status); QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore)); if (ignore) { break; } if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) { continue; } QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status)); break; } return TSDB_CODE_SUCCESS; } int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) { SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); newSch.hbBrokenTs = taosGetTimestampMs(); if (NULL == newSch.tasksHash) { QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } QW_LOCK(QW_WRITE, &mgmt->schLock); int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch)); if (0 != code) { if (!HASH_NODE_EXIST(code)) { QW_UNLOCK(QW_WRITE, &mgmt->schLock); QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno); taosHashCleanup(newSch.tasksHash); QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } taosHashCleanup(newSch.tasksHash); } QW_UNLOCK(QW_WRITE, &mgmt->schLock); return TSDB_CODE_SUCCESS; } int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { while (true) { QW_LOCK(rwType, &mgmt->schLock); *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); if (NULL == (*sch)) { QW_UNLOCK(rwType, &mgmt->schLock); if (QW_NOT_EXIST_ADD == nOpt) { QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType)); nOpt = QW_NOT_EXIST_RET_ERR; continue; } else if (QW_NOT_EXIST_RET_ERR == nOpt) { QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST); } else { QW_SCH_ELOG("unknown notExistOpt:%d", nOpt); QW_ERR_RET(TSDB_CODE_APP_ERROR); } } break; } return TSDB_CODE_SUCCESS; } int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD); } int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR); } void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) { char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, tId, eId); QW_LOCK(rwType, &sch->tasksLock); *task = taosHashGet(sch->tasksHash, id, sizeof(id)); if (NULL == (*task)) { QW_UNLOCK(rwType, &sch->tasksLock); QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST); } return TSDB_CODE_SUCCESS; } int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) { int32_t code = 0; char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, tId, eId); SQWTaskStatus ntask = {0}; ntask.status = status; ntask.refId = rId; QW_LOCK(QW_WRITE, &sch->tasksLock); code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask)); if (0 != code) { QW_UNLOCK(QW_WRITE, &sch->tasksLock); if (HASH_NODE_EXIST(code)) { if (rwType && task) { QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task)); } else { QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status)); QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST); } } else { QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code)); QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } } QW_UNLOCK(QW_WRITE, &sch->tasksLock); QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status)); if (rwType && task) { QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task)); } return TSDB_CODE_SUCCESS; } int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) { SQWSchStatus *tsch = NULL; int32_t code = 0; QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch)); QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL)); _return: qwReleaseScheduler(QW_READ, mgmt); QW_RET(code); } int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) { return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task); } void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); } int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, tId, eId); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped); if (NULL == (*ctx)) { if (!nodeStopped) { QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } else { QW_TASK_DLOG_E("node stopped"); QW_ERR_RET(TSDB_CODE_VND_STOPPED); } } return TSDB_CODE_SUCCESS; } int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, tId, eId); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped); if (NULL == (*ctx)) { if (!nodeStopped) { QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } else { QW_TASK_DLOG_E("node stopped"); QW_ERR_RET(TSDB_CODE_VND_STOPPED); } } return TSDB_CODE_SUCCESS; } int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, tId, eId); SQWTaskCtx nctx = {0}; int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx)); if (0 != code) { if (HASH_NODE_EXIST(code)) { if (acquire && ctx) { QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx)); } else if (ctx) { QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx)); } else { QW_TASK_ELOG_E("task ctx already exist"); QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST); } } else { QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code); QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } } if (acquire && ctx) { QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx)); } else if (ctx) { QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx)); } return TSDB_CODE_SUCCESS; } int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL)); } int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); } void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { qDestroyTask(otaskHandle); qDebug("task handle destroyed"); } } int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t code = 0; // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { qDebug("start to kill task"); code = qAsyncKillTask(taskHandle, rspCode); atomic_store_ptr(&ctx->taskHandle, taskHandle); } QW_RET(code); } void qwFreeTaskCtx(SQWTaskCtx *ctx) { if (ctx->ctrlConnInfo.handle) { tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); } ctx->ctrlConnInfo.handle = NULL; ctx->ctrlConnInfo.refId = -1; // NO need to release dataConnInfo qwFreeTaskHandle(&ctx->taskHandle); if (ctx->sinkHandle) { dsDestroyDataSinker(ctx->sinkHandle); ctx->sinkHandle = NULL; qDebug("sink handle destroyed"); } } int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, tId, eId); SQWTaskCtx octx; SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); if (NULL == ctx) { QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } octx = *ctx; atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->sinkHandle, NULL); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { QW_TASK_ELOG_E("taosHashRemove from ctx hash failed"); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } qwFreeTaskCtx(&octx); QW_TASK_DLOG_E("task ctx dropped"); return TSDB_CODE_SUCCESS; } int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, tId, eId); if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) { QW_TASK_WLOG_E("scheduler does not exist"); return TSDB_CODE_SUCCESS; } if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) { qwReleaseScheduler(QW_WRITE, mgmt); QW_TASK_WLOG_E("task does not exist"); return TSDB_CODE_SUCCESS; } if (taosHashRemove(sch->tasksHash, id, sizeof(id))) { QW_TASK_ELOG_E("taosHashRemove task from hash failed"); QW_ERR_JRET(TSDB_CODE_APP_ERROR); } QW_TASK_DLOG_E("task status dropped"); _return: if (task) { qwReleaseTaskStatus(QW_WRITE, sch); } qwReleaseScheduler(QW_WRITE, mgmt); QW_RET(code); } int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch)); QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task)); QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status)); _return: if (task) { qwReleaseTaskStatus(QW_READ, sch); } qwReleaseScheduler(QW_READ, mgmt); QW_RET(code); } int32_t qwDropTask(QW_FPARAMS_DEF) { QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS())); QW_TASK_DLOG_E("task is dropped"); return TSDB_CODE_SUCCESS; } void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { int32_t paramIdx = 0; int32_t newParamIdx = 0; while (true) { paramIdx = atomic_load_32(&gQwMgmt.paramIdx); if (paramIdx == tListLen(gQwMgmt.param)) { newParamIdx = 1; } else { newParamIdx = paramIdx + 1; } if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) { break; } } if (paramIdx == tListLen(gQwMgmt.param)) { paramIdx = 0; } gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef; gQwMgmt.param[paramIdx].refId = refId; *pParam = &gQwMgmt.param[paramIdx]; } void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { char dbFName[TSDB_DB_FNAME_LEN] = {0}; char tbName[TSDB_TABLE_NAME_LEN] = {0}; qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion); if (dbFName[0] && tbName[0]) { sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName); } else { ctx->tbInfo.tbFName[0] = 0; } } void qwCloseRef(void) { taosWLockLatch(&gQwMgmt.lock); if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { taosCloseRef(gQwMgmt.qwRef); gQwMgmt.qwRef = -1; } taosWUnLockLatch(&gQwMgmt.lock); } void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); } void qwDestroyImpl(void *pMgmt) { SQWorker *mgmt = (SQWorker *)pMgmt; int8_t nodeType = mgmt->nodeType; int32_t nodeId = mgmt->nodeId; int32_t taskCount = 0; int32_t schStatusCount = 0; qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); taosTmrStop(mgmt->hbTimer); mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); uint64_t qId, tId; int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, tId, eId); qwFreeTaskCtx(ctx); QW_TASK_DLOG_E("task ctx freed"); pIter = taosHashIterate(mgmt->ctxHash, pIter); taskCount++; } taosHashCleanup(mgmt->ctxHash); pIter = taosHashIterate(mgmt->schHash, NULL); while (pIter) { SQWSchStatus *sch = (SQWSchStatus *)pIter; qwDestroySchStatus(sch); pIter = taosHashIterate(mgmt->schHash, pIter); schStatusCount++; } taosHashCleanup(mgmt->schHash); *mgmt->destroyed = 1; taosMemoryFree(mgmt); atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); qwCloseRef(); qDebug("qworker destroyed, type:%d, id:%d, handle:%p, taskCount:%d, schStatusCount: %d", nodeType, nodeId, mgmt, taskCount, schStatusCount); } int32_t qwOpenRef(void) { taosWLockLatch(&gQwMgmt.lock); if (gQwMgmt.qwRef < 0) { gQwMgmt.qwRef = taosOpenRef(100, qwDestroyImpl); if (gQwMgmt.qwRef < 0) { taosWUnLockLatch(&gQwMgmt.lock); qError("init qworker ref failed"); QW_RET(TSDB_CODE_OUT_OF_MEMORY); } } taosWUnLockLatch(&gQwMgmt.lock); return TSDB_CODE_SUCCESS; } int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) { if (ts <= 0) { return TSDB_CODE_SUCCESS; } int64_t duration = taosGetTimestampUs() - ts; switch (type) { case QUERY_QUEUE: ++mgmt->stat.msgStat.waitTime[0].num; mgmt->stat.msgStat.waitTime[0].total += duration; break; case FETCH_QUEUE: ++mgmt->stat.msgStat.waitTime[1].num; mgmt->stat.msgStat.waitTime[1].total += duration; break; default: qError("unsupported queue type %d", type); return TSDB_CODE_APP_ERROR; } return TSDB_CODE_SUCCESS; } int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { SQWTimeInQ *pStat = NULL; switch (type) { case QUERY_QUEUE: pStat = &mgmt->stat.msgStat.waitTime[0]; return pStat->num ? (pStat->total / pStat->num) : 0; case FETCH_QUEUE: pStat = &mgmt->stat.msgStat.waitTime[1]; return pStat->num ? (pStat->total / pStat->num) : 0; default: qError("unsupported queue type %d", type); } return -1; } void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { int32_t num = taosArrayGetSize(pExpiredSch); for (int32_t i = 0; i < num; ++i) { uint64_t *sId = taosArrayGet(pExpiredSch, i); SQWSchStatus *pSch = NULL; if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) { continue; } if (taosHashGetSize(pSch->tasksHash) <= 0) { qwDestroySchStatus(pSch); taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); qDebug("sch %" PRIx64 " destroyed", *sId); } qwReleaseScheduler(QW_WRITE, mgmt); } }