提交 e4e46768 编写于 作者: H Hongze Cheng

[TD-3353]<hotfix>: solve race condition coredump

上级 0c465292
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
#include "tdataformat.h" #include "tdataformat.h"
#include "tname.h" #include "tname.h"
#include "hash.h" #include "hash.h"
#include "tlockfree.h"
#include "tlist.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -172,10 +174,32 @@ typedef struct STsdbQueryCond { ...@@ -172,10 +174,32 @@ typedef struct STsdbQueryCond {
int32_t type; // data block load type: int32_t type; // data block load type:
} STsdbQueryCond; } STsdbQueryCond;
typedef struct STableData STableData;
typedef struct {
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfRows;
int32_t maxTables;
STableData **tData;
SList * actList;
SList * extraBuffList;
SList * bufBlockList;
int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO
} SMemTable;
typedef struct {
SMemTable* mem;
SMemTable* imem;
SMemTable mtable;
SMemTable* omem;
} SMemSnapshot;
typedef struct SMemRef { typedef struct SMemRef {
int32_t ref; int32_t ref;
void * mem; SMemSnapshot snapshot;
void * imem;
} SMemRef; } SMemRef;
typedef struct SDataBlockInfo { typedef struct SDataBlockInfo {
......
...@@ -1840,7 +1840,7 @@ static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) { ...@@ -1840,7 +1840,7 @@ static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) {
pRuntimeEnv->pQueryHandle = NULL; pRuntimeEnv->pQueryHandle = NULL;
SMemRef* pMemRef = &pQuery->memRef; SMemRef* pMemRef = &pQuery->memRef;
assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL); assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL);
} }
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
......
...@@ -31,29 +31,14 @@ typedef struct { ...@@ -31,29 +31,14 @@ typedef struct {
SSkipListIterator *pIter; SSkipListIterator *pIter;
} SCommitIter; } SCommitIter;
typedef struct { struct STableData {
uint64_t uid; uint64_t uid;
TSKEY keyFirst; TSKEY keyFirst;
TSKEY keyLast; TSKEY keyLast;
int64_t numOfRows; int64_t numOfRows;
SSkipList* pData; SSkipList* pData;
T_REF_DECLARE() T_REF_DECLARE()
} STableData; };
typedef struct {
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfRows;
int32_t maxTables;
STableData** tData;
SList* actList;
SList* extraBuffList;
SList* bufBlockList;
int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO
} SMemTable;
enum { TSDB_UPDATE_META, TSDB_DROP_META }; enum { TSDB_UPDATE_META, TSDB_DROP_META };
...@@ -77,8 +62,8 @@ typedef struct { ...@@ -77,8 +62,8 @@ typedef struct {
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem, SArray* pATable); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pATable);
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem); void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
......
...@@ -124,88 +124,80 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -124,88 +124,80 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
return 0; return 0;
} }
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem, SArray *pATable) { int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot, SArray *pATable) {
SMemTable *tmem; memset(pSnapshot, 0, sizeof(*pSnapshot));
// Get snap object
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
tmem = pRepo->mem; pSnapshot->omem = pRepo->mem;
*pIMem = pRepo->imem; pSnapshot->imem = pRepo->imem;
tsdbRefMemTable(pRepo, tmem); tsdbRefMemTable(pRepo, pRepo->mem);
tsdbRefMemTable(pRepo, *pIMem); tsdbRefMemTable(pRepo, pRepo->imem);
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
// Copy mem objects and ref needed STableData if (pSnapshot->omem) {
if (tmem) { taosRLockLatch(&(pSnapshot->omem->latch));
taosRLockLatch(&(tmem->latch));
*pMem = (SMemTable *)calloc(1, sizeof(**pMem)); pSnapshot->mem = &(pSnapshot->mtable);
if (*pMem == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
taosRUnLockLatch(&(tmem->latch));
tsdbUnRefMemTable(pRepo, tmem);
tsdbUnRefMemTable(pRepo, *pIMem);
*pMem = NULL;
*pIMem = NULL;
return -1;
}
(*pMem)->tData = (STableData **)calloc(tmem->maxTables, sizeof(STableData *)); pSnapshot->mem->tData = (STableData **)calloc(pSnapshot->omem->maxTables, sizeof(STableData *));
if ((*pMem)->tData == NULL) { if (pSnapshot->mem->tData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
taosRUnLockLatch(&(tmem->latch)); taosRUnLockLatch(&(pSnapshot->omem->latch));
free(*pMem); tsdbUnRefMemTable(pRepo, pSnapshot->omem);
tsdbUnRefMemTable(pRepo, tmem); tsdbUnRefMemTable(pRepo, pSnapshot->imem);
tsdbUnRefMemTable(pRepo, *pIMem); pSnapshot->mem = NULL;
*pMem = NULL; pSnapshot->imem = NULL;
*pIMem = NULL; pSnapshot->omem = NULL;
return -1; return -1;
} }
(*pMem)->keyFirst = tmem->keyFirst; pSnapshot->mem->keyFirst = pSnapshot->omem->keyFirst;
(*pMem)->keyLast = tmem->keyLast; pSnapshot->mem->keyLast = pSnapshot->omem->keyLast;
(*pMem)->numOfRows = tmem->numOfRows; pSnapshot->mem->numOfRows = pSnapshot->omem->numOfRows;
(*pMem)->maxTables = tmem->maxTables; pSnapshot->mem->maxTables = pSnapshot->omem->maxTables;
for (size_t i = 0; i < taosArrayGetSize(pATable); i++) { for (size_t i = 0; i < taosArrayGetSize(pATable); i++) {
STable * pTable = *(STable **)taosArrayGet(pATable, i); STable * pTable = *(STable **)taosArrayGet(pATable, i);
int32_t tid = TABLE_TID(pTable); int32_t tid = TABLE_TID(pTable);
STableData *pTableData = (tid < tmem->maxTables) ? tmem->tData[tid] : NULL; STableData *pTableData = (tid < pSnapshot->omem->maxTables) ? pSnapshot->omem->tData[tid] : NULL;
if ((pTableData == NULL) || (TABLE_UID(pTable) != pTableData->uid)) continue; if ((pTableData == NULL) || (TABLE_UID(pTable) != pTableData->uid)) continue;
(*pMem)->tData[tid] = tmem->tData[tid]; pSnapshot->mem->tData[tid] = pTableData;
T_REF_INC(tmem->tData[tid]); T_REF_INC(pTableData);
} }
taosRUnLockLatch(&(tmem->latch)); taosRUnLockLatch(&(pSnapshot->omem->latch));
} }
tsdbUnRefMemTable(pRepo, tmem); tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pSnapshot->omem, pSnapshot->imem);
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
return 0; return 0;
} }
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) { void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot) {
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem); tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pSnapshot->omem, pSnapshot->imem);
if (pMem != NULL) { if (pSnapshot->mem) {
for (size_t i = 0; i < pMem->maxTables; i++) { ASSERT(pSnapshot->omem != NULL);
STableData *pTableData = pMem->tData[i];
for (size_t i = 0; i < pSnapshot->mem->maxTables; i++) {
STableData *pTableData = pSnapshot->mem->tData[i];
if (pTableData) { if (pTableData) {
tsdbFreeTableData(pTableData); tsdbFreeTableData(pTableData);
} }
} }
free(pMem->tData); tfree(pSnapshot->mem->tData);
free(pMem);
}
if (pIMem != NULL) { tsdbUnRefMemTable(pRepo, pSnapshot->omem);
tsdbUnRefMemTable(pRepo, pIMem);
} }
tsdbUnRefMemTable(pRepo, pSnapshot->imem);
pSnapshot->mem = NULL;
pSnapshot->imem = NULL;
pSnapshot->omem = NULL;
} }
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
......
...@@ -194,7 +194,7 @@ static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle, SArray* psTab ...@@ -194,7 +194,7 @@ static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle, SArray* psTab
SMemRef* pMemRef = pQueryHandle->pMemRef; SMemRef* pMemRef = pQueryHandle->pMemRef;
if (pQueryHandle->pMemRef->ref++ == 0) { if (pQueryHandle->pMemRef->ref++ == 0) {
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, (SMemTable**)&(pMemRef->mem), (SMemTable**)&(pMemRef->imem), psTable); tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &(pMemRef->snapshot), psTable);
} }
taosArrayDestroy(psTable); taosArrayDestroy(psTable);
...@@ -208,9 +208,7 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { ...@@ -208,9 +208,7 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
} }
if (--pMemRef->ref == 0) { if (--pMemRef->ref == 0) {
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pMemRef->mem, pMemRef->imem); tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, &(pMemRef->snapshot));
pMemRef->mem = NULL;
pMemRef->imem = NULL;
} }
pQueryHandle->pMemRef = NULL; pQueryHandle->pMemRef = NULL;
...@@ -229,10 +227,10 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) { ...@@ -229,10 +227,10 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) {
if (pMemRef == NULL) { return rows; } if (pMemRef == NULL) { return rows; }
STableData* pMem = NULL; STableData* pMem = NULL;
STableData* pIMem = NULL; STableData* pIMem = NULL;
SMemTable *pMemT = (SMemTable *)(pMemRef->mem); SMemTable* pMemT = pMemRef->snapshot.mem;
SMemTable *pIMemT = (SMemTable *)(pMemRef->imem); SMemTable* pIMemT = pMemRef->snapshot.imem;
if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) { if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) {
pMem = pMemT->tData[pCheckInfo->tableId.tid]; pMem = pMemT->tData[pCheckInfo->tableId.tid];
...@@ -605,7 +603,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -605,7 +603,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
int32_t order = pHandle->order; int32_t order = pHandle->order;
// no data in buffer, abort // no data in buffer, abort
if (pHandle->pMemRef->mem == NULL && pHandle->pMemRef->imem == NULL) { if (pHandle->pMemRef->snapshot.mem == NULL && pHandle->pMemRef->snapshot.imem == NULL) {
return false; return false;
} }
...@@ -614,8 +612,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -614,8 +612,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
STableData* pMem = NULL; STableData* pMem = NULL;
STableData* pIMem = NULL; STableData* pIMem = NULL;
SMemTable* pMemT = pHandle->pMemRef->mem; SMemTable* pMemT = pHandle->pMemRef->snapshot.mem;
SMemTable* pIMemT = pHandle->pMemRef->imem; SMemTable* pIMemT = pHandle->pMemRef->snapshot.imem;
if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) { if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) {
pMem = pMemT->tData[pCheckInfo->tableId.tid]; pMem = pMemT->tData[pCheckInfo->tableId.tid];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册