提交 2e3e34f3 编写于 作者: H Hongze Cheng

[TD-3096]<hotfix>: fix query dead lock

上级 62ac3d2e
...@@ -37,6 +37,7 @@ typedef struct { ...@@ -37,6 +37,7 @@ typedef struct {
TSKEY keyLast; TSKEY keyLast;
int64_t numOfRows; int64_t numOfRows;
SSkipList* pData; SSkipList* pData;
T_REF_DECLARE();
} STableData; } STableData;
typedef struct { typedef struct {
...@@ -76,7 +77,7 @@ typedef struct { ...@@ -76,7 +77,7 @@ typedef struct {
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem, SArray* pATable);
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbAsyncCommit(STsdbRepo* pRepo);
......
...@@ -124,17 +124,66 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -124,17 +124,66 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
return 0; return 0;
} }
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem, SArray *pATable) {
SMemTable *tmem;
// Get snap object
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
*pMem = pRepo->mem; tmem = pRepo->mem;
*pIMem = pRepo->imem; *pIMem = pRepo->imem;
tsdbRefMemTable(pRepo, *pMem); tsdbRefMemTable(pRepo, tmem);
tsdbRefMemTable(pRepo, *pIMem); tsdbRefMemTable(pRepo, *pIMem);
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch)); // Copy mem objects and ref needed STableData
if (tmem) {
taosRLockLatch(&(tmem->latch));
*pMem = (SMemTable *)calloc(1, sizeof(**pMem));
if (*pMem == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
taosRUnLockLatch(&(tmem->latch));
tsdbUnRefMemTable(pRepo, tmem);
tsdbUnRefMemTable(pRepo, *pIMem);
*pMem = NULL;
*pIMem = NULL;
return -1;
}
(*pMem)->tData = (STableData **)calloc(tmem->maxTables, sizeof(STableData *));
if ((*pMem)->tData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
taosRUnLockLatch(&(tmem->latch));
free(*pMem);
tsdbUnRefMemTable(pRepo, tmem);
tsdbUnRefMemTable(pRepo, *pIMem);
*pMem = NULL;
*pIMem = NULL;
return -1;
}
(*pMem)->keyFirst = tmem->keyFirst;
(*pMem)->keyLast = tmem->keyLast;
(*pMem)->numOfRows = tmem->numOfRows;
(*pMem)->maxTables = tmem->maxTables;
for (size_t i = 0; i < taosArrayGetSize(pATable); i++) {
STable * pTable = *(STable **)taosArrayGet(pATable, i);
int32_t tid = TABLE_TID(pTable);
STableData *pTableData = (tid < tmem->maxTables) ? tmem->tData[tid] : NULL;
if ((pTableData == NULL) || (TABLE_UID(pTable) != pTableData->uid)) continue;
(*pMem)->tData[tid] = tmem->tData[tid];
T_REF_INC(tmem->tData[tid]);
}
taosRUnLockLatch(&(tmem->latch));
}
tsdbUnRefMemTable(pRepo, tmem);
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem); tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
return 0; return 0;
...@@ -144,8 +193,14 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) ...@@ -144,8 +193,14 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem)
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem); tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
if (pMem != NULL) { if (pMem != NULL) {
taosRUnLockLatch(&(pMem->latch)); for (size_t i = 0; i < pMem->maxTables; i++) {
tsdbUnRefMemTable(pRepo, pMem); STableData *pTableData = pMem->tData[i];
if (pTableData) {
tsdbFreeTableData(pTableData);
}
}
free(pMem->tData);
free(pMem);
} }
if (pIMem != NULL) { if (pIMem != NULL) {
...@@ -436,7 +491,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { ...@@ -436,7 +491,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData)); STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData));
if (pTableData == NULL) { if (pTableData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; return NULL;
} }
pTableData->uid = TABLE_UID(pTable); pTableData->uid = TABLE_UID(pTable);
...@@ -449,21 +504,23 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { ...@@ -449,21 +504,23 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey); tkeyComparFn, pCfg->update ? SL_UPDATE_DUP_KEY : SL_DISCARD_DUP_KEY, tsdbGetTsTupleKey);
if (pTableData->pData == NULL) { if (pTableData->pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; free(pTableData);
return NULL;
} }
return pTableData; T_REF_INC(pTableData);
_err: return pTableData;
tsdbFreeTableData(pTableData);
return NULL;
} }
static void tsdbFreeTableData(STableData *pTableData) { static void tsdbFreeTableData(STableData *pTableData) {
if (pTableData) { if (pTableData) {
int32_t ref = T_REF_DEC(pTableData);
if (ref == 0) {
tSkipListDestroy(pTableData->pData); tSkipListDestroy(pTableData->pData);
free(pTableData); free(pTableData);
} }
}
} }
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); } static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); }
......
...@@ -192,7 +192,7 @@ static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { ...@@ -192,7 +192,7 @@ static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
SMemRef* pMemRef = pQueryHandle->pMemRef; SMemRef* pMemRef = pQueryHandle->pMemRef;
if (pQueryHandle->pMemRef->ref++ == 0) { if (pQueryHandle->pMemRef->ref++ == 0) {
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, (SMemTable**)&(pMemRef->mem), (SMemTable**)&(pMemRef->imem)); tsdbTakeMemSnapshot(pQueryHandle->pTsdb, (SMemTable**)&(pMemRef->mem), (SMemTable**)&(pMemRef->imem), NULL);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册