From a4d4cd2a84c89cc7a5a8c1217b23ca122835a6e1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 29 Apr 2022 11:31:39 +0800 Subject: [PATCH] stmt query --- source/libs/scalar/src/scalar.c | 10 ++++++- source/libs/scheduler/inc/schedulerInt.h | 2 ++ source/libs/scheduler/src/scheduler.c | 35 ++++++++++++++++++------ 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index ba71c4ae3c..f5fab814ff 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -75,7 +75,15 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { if (valueNode->node.resType.type != type) { out.columnData->info.type = type; - out.columnData->info.bytes = tDataTypes[type].bytes; + if (IS_VAR_DATA_TYPE(type)) { + if (IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { + out.columnData->info.bytes = valueNode->node.resType.bytes * TSDB_NCHAR_SIZE; + } else { + out.columnData->info.bytes = 64 * TSDB_NCHAR_SIZE; + } + } else { + out.columnData->info.bytes = tDataTypes[type].bytes; + } code = doConvertDataType(valueNode, &out); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 0f6961018c..f4d6872969 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -85,7 +85,9 @@ typedef struct SSchedulerMgmt { uint64_t taskId; // sequential taksId uint64_t sId; // schedulerId SSchedulerCfg cfg; + SRWLatch lock; int32_t jobRef; + int32_t jobNum; SSchedulerStat stat; SHashObj *hbConnections; } SSchedulerMgmt; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 31d2da9380..aa4e598d7d 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -21,7 +21,9 @@ #include "tref.h" #include "trpc.h" -SSchedulerMgmt schMgmt = {0}; +SSchedulerMgmt schMgmt = { + .jobRef = -1, +}; FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } @@ -70,6 +72,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, int64_t startTs, bool syncSchedule) { int32_t code = 0; + int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); if (NULL == pJob) { qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); @@ -114,15 +117,17 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray tsem_init(&pJob->rspSem, 0, 0); - int64_t refId = taosAddRef(schMgmt.jobRef, pJob); + refId = taosAddRef(schMgmt.jobRef, pJob); if (refId < 0) { SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); SCH_ERR_JRET(terrno); } + atomic_add_fetch_32(&schMgmt.jobNum, 1); + if (NULL == schAcquireJob(refId)) { SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } pJob->refId = refId; @@ -137,7 +142,11 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray _return: - schFreeJobImpl(pJob); + if (refId < 0) { + schFreeJobImpl(pJob); + } else { + taosRemoveRef(schMgmt.jobRef, refId); + } SCH_RET(code); } @@ -2239,6 +2248,15 @@ int32_t schCancelJob(SSchJob *pJob) { // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST } +void schCloseJobRef(void) { + SCH_LOCK(SCH_WRITE, &schMgmt.lock); + if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) { + taosCloseRef(schMgmt.jobRef); + schMgmt.jobRef = -1; + } + SCH_UNLOCK(SCH_WRITE, &schMgmt.lock); +} + void schFreeJobImpl(void *job) { if (NULL == job) { return; @@ -2284,6 +2302,10 @@ void schFreeJobImpl(void *job) { taosMemoryFreeClear(pJob); qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob); + + atomic_sub_fetch_32(&schMgmt.jobNum, 1); + + schCloseJobRef(); } static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, @@ -2732,7 +2754,7 @@ void schedulerFreeTaskList(SArray *taskList) { } void schedulerDestroy(void) { - if (schMgmt.jobRef) { + if (schMgmt.jobRef >= 0) { SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); int64_t refId = 0; @@ -2745,9 +2767,6 @@ void schedulerDestroy(void) { pJob = taosIterateRef(schMgmt.jobRef, refId); } - - taosCloseRef(schMgmt.jobRef); - schMgmt.jobRef = 0; } if (schMgmt.hbConnections) { -- GitLab