From f263dac186d1a15280186b7ded023ab2d0ec93f1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sun, 10 Jul 2022 19:29:26 +0800 Subject: [PATCH] fix: fix taosc crash issue --- include/util/tlockfree.h | 2 +- source/libs/scheduler/inc/schInt.h | 44 +++++++++++++++++++++++++++-- source/libs/scheduler/src/schDbg.c | 1 + source/libs/scheduler/src/schJob.c | 5 ++++ source/libs/scheduler/src/schTask.c | 4 +-- source/util/src/tlockfree.c | 4 ++- 6 files changed, 54 insertions(+), 6 deletions(-) diff --git a/include/util/tlockfree.h b/include/util/tlockfree.h index 44e43f81cf..638499cc60 100644 --- a/include/util/tlockfree.h +++ b/include/util/tlockfree.h @@ -69,7 +69,7 @@ typedef void (*_ref_fn_t)(const void *pObj); #define T_REF_VAL_GET(x) (x)->_ref.val // single writer multiple reader lock -typedef volatile int32_t SRWLatch; +typedef volatile int64_t SRWLatch; void taosInitRWLatch(SRWLatch *pLatch); void taosWLockLatch(SRWLatch *pLatch); diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 052fdefa61..4b5aac60ea 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -54,6 +54,11 @@ typedef enum { SCH_OP_GET_STATUS, } SCH_OP_TYPE; +typedef struct SSchDebug { + bool lockEnable; + bool apiEnable; +} SSchDebug; + typedef struct SSchTrans { void *pTrans; void *pHandle; @@ -356,8 +361,41 @@ extern SSchedulerMgmt schMgmt; #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0) #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0) -#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) -#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) +#define SCH_LOCK_DEBUG(...) do { if (gSCHDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0) + +#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 + +#define SCH_LOCK(type, _lock) do { \ + if (SCH_READ == (type)) { \ + assert(atomic_load_32((_lock)) >= 0); \ + SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosRLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) > 0); \ + } else { \ + assert(atomic_load_32((_lock)) >= 0); \ + SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosWLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + } \ +} while (0) + +#define SCH_UNLOCK(type, _lock) do { \ + if (SCH_READ == (type)) { \ + assert(atomic_load_32((_lock)) > 0); \ + SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosRUnLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) >= 0); \ + } else { \ + assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosWUnLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) >= 0); \ + } \ +} while (0) void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); @@ -435,6 +473,8 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); +extern SSchDebug gSCHDebug; + #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/schDbg.c b/source/libs/scheduler/src/schDbg.c index 7f013b8f32..5ecc27ff6e 100644 --- a/source/libs/scheduler/src/schDbg.c +++ b/source/libs/scheduler/src/schDbg.c @@ -17,6 +17,7 @@ #include "schInt.h" tsem_t schdRspSem; +SSchDebug gSCHDebug = {.lockEnable = true}; void schdExecCallback(SExecResult* pResult, void* param, int32_t code) { if (code) { diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index d2f9624eee..bba75db376 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -543,9 +543,12 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { if (rsp->tbFName[0]) { + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (NULL == pJob->execRes.res) { pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo)); if (NULL == pJob->execRes.res) { + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } } @@ -557,6 +560,8 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { taosArrayPush((SArray *)pJob->execRes.res, &tbInfo); pJob->execRes.msgType = TDMT_SCH_QUERY; + + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index e1e4ed8769..23c542b670 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -263,7 +263,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i); int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); - SCH_LOCK(SCH_WRITE, &parent->lock); + SCH_LOCK_TASK(parent); SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, .taskId = pTask->taskId, .schedId = schMgmt.sId, @@ -272,7 +272,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { .fetchMsgType = SCH_FETCH_TYPE(pTask), }; qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); - SCH_UNLOCK(SCH_WRITE, &parent->lock); + SCH_UNLOCK_TASK(parent); if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId); diff --git a/source/util/src/tlockfree.c b/source/util/src/tlockfree.c index a755a67cc8..55f0211476 100644 --- a/source/util/src/tlockfree.c +++ b/source/util/src/tlockfree.c @@ -17,8 +17,10 @@ #include "tlockfree.h" #define TD_RWLATCH_WRITE_FLAG 0x40000000 +#define TD_RWLATCH_REENTRANT_FLAG 0x4000000000000000 void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; } +void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = 0x4000000000000000; } void taosWLockLatch(SRWLatch *pLatch) { SRWLatch oLatch, nLatch; @@ -90,4 +92,4 @@ void taosRLockLatch(SRWLatch *pLatch) { } } -void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); } \ No newline at end of file +void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); } -- GitLab