From 6413285bb9c14668ba1299e4cb643132636a5dd8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 13 Jun 2020 02:43:15 +0000 Subject: [PATCH] TD-353 --- src/tsdb/inc/tsdbMain.h | 6 ++- src/tsdb/src/tsdbMain.c | 4 ++ src/tsdb/src/tsdbMemTable.c | 91 +++++++++++++++++++++---------------- 3 files changed, 60 insertions(+), 41 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 6055f30aad..40c5ddbab5 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -254,7 +254,7 @@ typedef struct { // ------------------ tsdbMain.c typedef struct { - int8_t state; + int8_t state; char* rootDir; STsdbCfg config; @@ -265,9 +265,10 @@ typedef struct { SMemTable* mem; SMemTable* imem; STsdbFileH* tsdbFileH; - pthread_mutex_t mutex; int commit; pthread_t commitThread; + pthread_mutex_t mutex; + bool repoLocked; } STsdbRepo; // Operations @@ -309,6 +310,7 @@ void tsdbFreeFileH(STsdbFileH* pFileH); // ------------------ tsdbMain.c #define REPO_ID(r) (r)->config.tsdbId +#define IS_REPO_LOCKED(r) (r)->repoLocked char* tsdbGetMetaFileName(char* rootDir); int tsdbLockRepo(STsdbRepo* pRepo); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 36595ab987..ef1a4af064 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -337,10 +337,12 @@ int tsdbLockRepo(STsdbRepo *pRepo) { terrno = TAOS_SYSTEM_ERROR(code); return -1; } + pRepo->repoLocked = true; return 0; } int tsdbUnlockRepo(STsdbRepo *pRepo) { + pRepo->repoLocked = false; int code = pthread_mutex_unlock(&pRepo->mutex); if (code != 0) { tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pRepo), strerror(errno)); @@ -679,6 +681,8 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { goto _err; } + pRepo->repoLocked = false; + pRepo->rootDir = strdup(rootDir); if (pRepo->rootDir == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 4ac5f5a5f2..c848b7644b 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -20,60 +20,68 @@ // ---------------- INTERNAL FUNCTIONS ---------------- int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { - STsdbCfg *pCfg = &pRepo->config; - int32_t level = 0; - int32_t headSize = 0; - TSKEY key = dataRowKey(row); + STsdbCfg * pCfg = &pRepo->config; + int32_t level = 0; + int32_t headSize = 0; + TSKEY key = dataRowKey(row); + SMemTable * pMemTable = pRepo->mem; + STableData *pTableData = NULL; + int bytes = 0; + + if (pMemTable != NULL && pMemTable->tData[TABLE_TID(pTable)] != NULL && + pMemTable->tData[TABLE_TID(pTable)]->uid == TALBE_UID(pTable)) { + pTableData = pMemTable->tData[TABLE_TID(pTable)]; + } - // TODO - tSkipListNewNodeInfo(pRepo->mem->tData[TABLE_TID(pTable)]->pData, &level, &headSize); + tSkipListNewNodeInfo(pTableData, &level, &headSize); - // TODO: for duplicate keys, you do not need to allocate memory here - SSkipListNode *pNode = tsdbAllocBytes(pRepo, headSize + dataRowLen(row)); + bytes = headSize + dataRowLen(row); + SSkipListNode *pNode = tsdbAllocBytes(pRepo, bytes); if (pNode == NULL) { - tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s since %s", REPO_ID(pRepo), key, - TABLE_CHAR_NAME(pTable), tstrerror(terrno)); + tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", + REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), bytes, tstrerror(terrno)); return -1; } - - SMemTable *pMemTable = pRepo->mem; - ASSERT(pMemTable != NULL); - pNode->level = level; dataRowCpy(SL_GET_NODE_DATA(pNode), row); - STableData *pTableData = pMemTable->tData[TABLE_TID(pTable)]; - if (pTableData == NULL) { - pTableData = tsdbNewTableData(pCfg); + // Operations above may change pRepo->mem, retake those values + ASSERT(pRepo->mem != NULL); + pMemTable = pRepo->mem; + pTableData = pMemTable->tData[TABLE_TID(pTable)]; + + if (pTableData == NULL || pTableData->uid != TALBE_UID(pTable)) { + if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem) + pMemTable->tData[TABLE_TID(pTable)] = NULL; + tsdbFreeTableData(pTableData); + } + pTableData = tsdbNewTableData(pCfg, pTable); if (pTableData == NULL) { - tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s since %s", REPO_ID(pRepo), key, - TABLE_CHAR_NAME(pTable), tstrerror(terrno)); + tsdbError("vgId:%d failed to insert row with key %" PRId64 + " to table %s while create new table data object since %s", + REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno)); + tsdbFreeBytes(pRepo, (void *)pNode, bytes); return -1; } pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; } - ASSERT(pTableData != NULL); - - if (pTableData->uid != TALBE_UID(pTable)) { - // TODO - } + ASSERT(pTableData != NULL) && pTableData->uid == TALBE_UID(pTable); if (tSkipListPut(pTableData->pData, pNode) == NULL) { - tsdbFreeBytes(pRepo, (void *)pNode, headSize + dataRowLen); - return 0; - } - - if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; - if (pMemTable->keyLast < key) pMemTable->keyLast = key; - pMemTable->numOfRows++; + tsdbFreeBytes(pRepo, (void *)pNode, bytes); + } else { + if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; + if (pMemTable->keyLast < key) pMemTable->keyLast = key; + pMemTable->numOfRows++; - if (pTableData->keyFirst > key) pTableData->keyFirst = key; - if (pTableData->keyLast < key) pTableData->keyLast = key; - pTableData->numOfRows++; + if (pTableData->keyFirst > key) pTableData->keyFirst = key; + if (pTableData->keyLast < key) pTableData->keyLast = key; + pTableData->numOfRows++; - ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData)); + ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData)); + } tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TALBE_UID(pTable), key); @@ -82,12 +90,14 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { } int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { + ASSERT(IS_REPO_LOCKED(pRepo)); ASSERT(pMemTable != NULL); T_REF_INC(pMemTable); } // Need to lock the repository int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { + ASSERT(IS_REPO_LOCKED(pRepo)); ASSERT(pMemTable != NULL); if (T_REF_DEC(pMemTable) == 0) { @@ -117,7 +127,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { // ---------------- LOCAL FUNCTIONS ---------------- static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { - if (pRepo->mem == NULL) return NULL; + if (pRepo == NULL || pRepo->mem == NULL) return NULL; SListNode *pNode = listTail(pRepo->mem); if (pNode == NULL) return NULL; @@ -258,13 +268,14 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { pTableData->keyLast = 0; pTableData->numOfRows = 0; - pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, ); + pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, tsdbGetTsTupleKey); if (pTableData->pData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; } - // TODO + // TODO: operation here should not be here, remove it pTableData->pData->level = 1; return pTableData; @@ -279,4 +290,6 @@ static void tsdbFreeTableData(STableData *pTableData) { tSkipListDestroy(pTableData->pData); free(pTableData); } -} \ No newline at end of file +} + +static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); } \ No newline at end of file -- GitLab