提交 f263dac1 编写于 作者: D dapan1121

fix: fix taosc crash issue

上级 4d99188e
......@@ -69,7 +69,7 @@ typedef void (*_ref_fn_t)(const void *pObj);
#define T_REF_VAL_GET(x) (x)->_ref.val
// single writer multiple reader lock
typedef volatile int32_t SRWLatch;
typedef volatile int64_t SRWLatch;
void taosInitRWLatch(SRWLatch *pLatch);
void taosWLockLatch(SRWLatch *pLatch);
......
......@@ -54,6 +54,11 @@ typedef enum {
SCH_OP_GET_STATUS,
} SCH_OP_TYPE;
typedef struct SSchDebug {
bool lockEnable;
bool apiEnable;
} SSchDebug;
typedef struct SSchTrans {
void *pTrans;
void *pHandle;
......@@ -356,8 +361,41 @@ extern SSchedulerMgmt schMgmt;
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
#define SCH_LOCK_DEBUG(...) do { if (gSCHDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define SCH_LOCK(type, _lock) do { \
if (SCH_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define SCH_UNLOCK(type, _lock) do { \
if (SCH_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
......@@ -435,6 +473,8 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
extern SSchDebug gSCHDebug;
#ifdef __cplusplus
}
......
......@@ -17,6 +17,7 @@
#include "schInt.h"
tsem_t schdRspSem;
SSchDebug gSCHDebug = {.lockEnable = true};
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
if (code) {
......
......@@ -543,9 +543,12 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
if (rsp->tbFName[0]) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (NULL == pJob->execRes.res) {
pJob->execRes.res = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
if (NULL == pJob->execRes.res) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
......@@ -557,6 +560,8 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
pJob->execRes.msgType = TDMT_SCH_QUERY;
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
}
return TSDB_CODE_SUCCESS;
......
......@@ -263,7 +263,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
SCH_LOCK(SCH_WRITE, &parent->lock);
SCH_LOCK_TASK(parent);
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
.taskId = pTask->taskId,
.schedId = schMgmt.sId,
......@@ -272,7 +272,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.fetchMsgType = SCH_FETCH_TYPE(pTask),
};
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->lock);
SCH_UNLOCK_TASK(parent);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
......
......@@ -17,8 +17,10 @@
#include "tlockfree.h"
#define TD_RWLATCH_WRITE_FLAG 0x40000000
#define TD_RWLATCH_REENTRANT_FLAG 0x4000000000000000
void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; }
void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = 0x4000000000000000; }
void taosWLockLatch(SRWLatch *pLatch) {
SRWLatch oLatch, nLatch;
......@@ -90,4 +92,4 @@ void taosRLockLatch(SRWLatch *pLatch) {
}
}
void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); }
\ No newline at end of file
void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册