提交 251fdd35 编写于 作者: M Minglei Jin

fix(tsdb/read): wrap tsdb reader's lock with acquire/release

上级 101772fe
......@@ -573,6 +573,63 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
return pResBlock;
}
static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexInit(&pReader->readerMutex, NULL);
qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexDestroy(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
static int32_t tsdbAcquireReader(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexLock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexTryLock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
static int32_t tsdbReleaseReader(STsdbReader* pReader) {
int32_t code = -1;
qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexUnlock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code;
}
void tsdbReleaseDataBlock(STsdbReader* pReader) { tsdbReleaseReader(pReader); }
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
SSDataBlock* pResBlock, const char* idstr) {
int32_t code = 0;
......@@ -636,7 +693,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
setColumnIdSlotList(&pReader->suppInfo, pCond->colList, pCond->pSlotList, pCond->numOfCols);
taosThreadMutexInit(&pReader->readerMutex, NULL);
tsdbInitReaderLock(pReader);
*ppReader = pReader;
return code;
......@@ -4015,7 +4072,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, true);
pReader->pReadSnap = NULL;
taosThreadMutexDestroy(&pReader->readerMutex);
tsdbUninitReaderLock(pReader);
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
SIOCostSummary* pCost = &pReader->cost;
......@@ -4162,16 +4219,16 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
int32_t code = 0;
STsdbReader* pReader = pQHandle;
code = taosThreadMutexTryLock(&pReader->readerMutex);
code = tsdbTryAcquireReader(pReader);
if (code == 0) {
if (pReader->suspended) {
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return code;
}
tsdbReaderSuspend(pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return code;
} else if (code == EBUSY) {
......@@ -4272,8 +4329,9 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
qTrace("tsdb/read: %p, take read mutex", pReader);
taosThreadMutexLock(&pReader->readerMutex);
int32_t code = tsdbAcquireReader(pReader);
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
if (pReader->suspended) {
tsdbReaderResume(pReader);
}
......@@ -4285,7 +4343,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
pStatus = &pReader->innerReader[0]->status;
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
}
return ret;
......@@ -4308,7 +4366,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (ret) {
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
}
return ret;
......@@ -4328,7 +4386,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
pStatus = &pReader->innerReader[1]->status;
if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
}
return ret1;
......@@ -4336,7 +4394,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
}
qTrace("tsdb/read: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return false;
}
......@@ -4495,13 +4553,6 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
return pReader->pResBlock;
}
void tsdbReleaseDataBlock(STsdbReader* pReader) {
// SReaderStatus* pStatus = &pReader->status;
// if (!pStatus->composedDataBlock) {
taosThreadMutexUnlock(&pReader->readerMutex);
//}
}
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
STsdbReader* pTReader = pReader;
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
......@@ -4520,7 +4571,7 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return ret;
}
......@@ -4529,7 +4580,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
SReaderStatus* pStatus = &pReader->status;
qTrace("tsdb/reader-reset: %p, take read mutex", pReader);
taosThreadMutexLock(&pReader->readerMutex);
tsdbAcquireReader(pReader);
if (pReader->suspended) {
tsdbReaderResume(pReader);
......@@ -4538,7 +4589,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
tsdbDebug("tsdb reader reset return %p", pReader->pReadSnap);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return TSDB_CODE_SUCCESS;
}
......@@ -4576,7 +4627,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return code;
}
......@@ -4587,7 +4638,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
pReader->idStr);
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return code;
}
......@@ -4672,7 +4723,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
int64_t rows = 0;
SReaderStatus* pStatus = &pReader->status;
taosThreadMutexLock(&pReader->readerMutex);
tsdbAcquireReader(pReader);
if (pReader->suspended) {
tsdbReaderResume(pReader);
}
......@@ -4702,7 +4753,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
}
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbReleaseReader(pReader);
return rows;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册