提交 def96ee3 编写于 作者: D dapan1121

fix: fix taos crash and deak lock issue

上级 f263dac1
......@@ -71,11 +71,12 @@ typedef void (*_ref_fn_t)(const void *pObj);
// single writer multiple reader lock
typedef volatile int64_t SRWLatch;
void taosInitRWLatch(SRWLatch *pLatch);
void taosWLockLatch(SRWLatch *pLatch);
void taosWUnLockLatch(SRWLatch *pLatch);
void taosRLockLatch(SRWLatch *pLatch);
void taosRUnLockLatch(SRWLatch *pLatch);
void taosInitRWLatch(SRWLatch *pLatch);
void taosInitReentrantRWLatch(SRWLatch *pLatch);
void taosWLockLatch(SRWLatch *pLatch);
void taosWUnLockLatch(SRWLatch *pLatch);
void taosRLockLatch(SRWLatch *pLatch);
void taosRUnLockLatch(SRWLatch *pLatch);
int32_t taosWTryLockLatch(SRWLatch *pLatch);
// copy on read
......
......@@ -480,37 +480,35 @@ typedef struct SCtgOperation {
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define CTG_IS_LOCKED(_lock) atomic_load_32((_lock))
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) > 0); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
} \
} while (0)
......
......@@ -316,34 +316,34 @@ typedef struct SQWorkerMgmt {
#define QW_LOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) > 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
} \
} while (0)
......
......@@ -191,7 +191,7 @@ typedef struct SSchTaskProfile {
typedef struct SSchTask {
uint64_t taskId; // task id
SRWLatch lock; // task lock
SRWLatch lock; // task reentrant lock
int32_t maxExecTimes; // task may exec times
int32_t execId; // task current execute try index
SSchLevel *level; // level
......@@ -367,33 +367,33 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOCK(type, _lock) do { \
if (SCH_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64(_lock) >= 0); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64(_lock) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64(_lock) >= 0); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64(_lock) & TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define SCH_UNLOCK(type, _lock) do { \
if (SCH_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) > 0); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d B", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%" PRIx64 ", %s:%d E", (_lock), atomic_load_64(_lock), __FILE__, __LINE__); \
assert(atomic_load_64((_lock)) >= 0); \
} \
} while (0)
......
......@@ -17,7 +17,7 @@
#include "schInt.h"
tsem_t schdRspSem;
SSchDebug gSCHDebug = {.lockEnable = true};
SSchDebug gSCHDebug = {0};
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
if (code) {
......
......@@ -337,14 +337,14 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
SSchTask task = {0};
SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel, levelNum));
SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
if (NULL == pTask) {
SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel, levelNum));
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
......
......@@ -60,6 +60,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
taosInitReentrantRWLatch(&pTask->lock);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
......
......@@ -20,7 +20,7 @@
#define TD_RWLATCH_REENTRANT_FLAG 0x4000000000000000
void taosInitRWLatch(SRWLatch *pLatch) { *pLatch = 0; }
void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = 0x4000000000000000; }
void taosInitReentrantRWLatch(SRWLatch *pLatch) { *pLatch = TD_RWLATCH_REENTRANT_FLAG; }
void taosWLockLatch(SRWLatch *pLatch) {
SRWLatch oLatch, nLatch;
......@@ -28,8 +28,14 @@ void taosWLockLatch(SRWLatch *pLatch) {
// Set write flag
while (1) {
oLatch = atomic_load_32(pLatch);
oLatch = atomic_load_64(pLatch);
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
if (oLatch & TD_RWLATCH_REENTRANT_FLAG) {
nLatch = (((oLatch >> 32) + 1) << 32) | (oLatch & 0xFFFFFFFF);
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
continue;
}
nLoops++;
if (nLoops > 1000) {
sched_yield();
......@@ -39,14 +45,14 @@ void taosWLockLatch(SRWLatch *pLatch) {
}
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break;
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
}
// wait for all reads end
nLoops = 0;
while (1) {
oLatch = atomic_load_32(pLatch);
if (oLatch == TD_RWLATCH_WRITE_FLAG) break;
oLatch = atomic_load_64(pLatch);
if (0 == (oLatch & 0xFFFFFFF)) break;
nLoops++;
if (nLoops > 1000) {
sched_yield();
......@@ -55,29 +61,50 @@ void taosWLockLatch(SRWLatch *pLatch) {
}
}
// no reentrant
int32_t taosWTryLockLatch(SRWLatch *pLatch) {
SRWLatch oLatch, nLatch;
oLatch = atomic_load_32(pLatch);
if (oLatch) {
oLatch = atomic_load_64(pLatch);
if (oLatch << 2) {
return -1;
}
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) {
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) {
return 0;
}
return -1;
}
void taosWUnLockLatch(SRWLatch *pLatch) { atomic_store_32(pLatch, 0); }
void taosWUnLockLatch(SRWLatch *pLatch) {
SRWLatch oLatch, nLatch, wLatch;
while (1) {
oLatch = atomic_load_64(pLatch);
if (0 == (oLatch & TD_RWLATCH_REENTRANT_FLAG)) {
atomic_store_64(pLatch, 0);
break;
}
wLatch = ((oLatch << 2) >> 34);
if (wLatch) {
nLatch = ((--wLatch) << 32) | TD_RWLATCH_REENTRANT_FLAG | TD_RWLATCH_WRITE_FLAG;
} else {
nLatch = TD_RWLATCH_REENTRANT_FLAG;
}
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
}
}
void taosRLockLatch(SRWLatch *pLatch) {
SRWLatch oLatch, nLatch;
int32_t nLoops = 0;
while (1) {
oLatch = atomic_load_32(pLatch);
oLatch = atomic_load_64(pLatch);
if (oLatch & TD_RWLATCH_WRITE_FLAG) {
nLoops++;
if (nLoops > 1000) {
......@@ -88,8 +115,8 @@ void taosRLockLatch(SRWLatch *pLatch) {
}
nLatch = oLatch + 1;
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) break;
if (atomic_val_compare_exchange_64(pLatch, oLatch, nLatch) == oLatch) break;
}
}
void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_32(pLatch, 1); }
void taosRUnLockLatch(SRWLatch *pLatch) { atomic_fetch_sub_64(pLatch, 1); }
......@@ -32,9 +32,9 @@ class TDTestCase:
#
# --------------- main frame -------------------
#
clientCfgDict = {'queryPolicy': '1','debugFlag': 135}
clientCfgDict = {'queryPolicy': '1','debugFlag': 143}
clientCfgDict["queryPolicy"] = '1'
clientCfgDict["debugFlag"] = 131
clientCfgDict["debugFlag"] = 143
updatecfgDict = {'clientCfg': {}}
updatecfgDict = {'debugFlag': 143}
......@@ -480,4 +480,4 @@ class TDTestCase:
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册