提交 6413285b 编写于 作者: H Hongze Cheng

TD-353

上级 c2d352b9
...@@ -254,7 +254,7 @@ typedef struct { ...@@ -254,7 +254,7 @@ typedef struct {
// ------------------ tsdbMain.c // ------------------ tsdbMain.c
typedef struct { typedef struct {
int8_t state; int8_t state;
char* rootDir; char* rootDir;
STsdbCfg config; STsdbCfg config;
...@@ -265,9 +265,10 @@ typedef struct { ...@@ -265,9 +265,10 @@ typedef struct {
SMemTable* mem; SMemTable* mem;
SMemTable* imem; SMemTable* imem;
STsdbFileH* tsdbFileH; STsdbFileH* tsdbFileH;
pthread_mutex_t mutex;
int commit; int commit;
pthread_t commitThread; pthread_t commitThread;
pthread_mutex_t mutex;
bool repoLocked;
} STsdbRepo; } STsdbRepo;
// Operations // Operations
...@@ -309,6 +310,7 @@ void tsdbFreeFileH(STsdbFileH* pFileH); ...@@ -309,6 +310,7 @@ void tsdbFreeFileH(STsdbFileH* pFileH);
// ------------------ tsdbMain.c // ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId #define REPO_ID(r) (r)->config.tsdbId
#define IS_REPO_LOCKED(r) (r)->repoLocked
char* tsdbGetMetaFileName(char* rootDir); char* tsdbGetMetaFileName(char* rootDir);
int tsdbLockRepo(STsdbRepo* pRepo); int tsdbLockRepo(STsdbRepo* pRepo);
......
...@@ -337,10 +337,12 @@ int tsdbLockRepo(STsdbRepo *pRepo) { ...@@ -337,10 +337,12 @@ int tsdbLockRepo(STsdbRepo *pRepo) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
return -1; return -1;
} }
pRepo->repoLocked = true;
return 0; return 0;
} }
int tsdbUnlockRepo(STsdbRepo *pRepo) { int tsdbUnlockRepo(STsdbRepo *pRepo) {
pRepo->repoLocked = false;
int code = pthread_mutex_unlock(&pRepo->mutex); int code = pthread_mutex_unlock(&pRepo->mutex);
if (code != 0) { if (code != 0) {
tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pRepo), strerror(errno)); tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pRepo), strerror(errno));
...@@ -679,6 +681,8 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { ...@@ -679,6 +681,8 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
goto _err; goto _err;
} }
pRepo->repoLocked = false;
pRepo->rootDir = strdup(rootDir); pRepo->rootDir = strdup(rootDir);
if (pRepo->rootDir == NULL) { if (pRepo->rootDir == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......
...@@ -20,60 +20,68 @@ ...@@ -20,60 +20,68 @@
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
STsdbCfg *pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
int32_t level = 0; int32_t level = 0;
int32_t headSize = 0; int32_t headSize = 0;
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
SMemTable * pMemTable = pRepo->mem;
STableData *pTableData = NULL;
int bytes = 0;
if (pMemTable != NULL && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
pMemTable->tData[TABLE_TID(pTable)]->uid == TALBE_UID(pTable)) {
pTableData = pMemTable->tData[TABLE_TID(pTable)];
}
// TODO tSkipListNewNodeInfo(pTableData, &level, &headSize);
tSkipListNewNodeInfo(pRepo->mem->tData[TABLE_TID(pTable)]->pData, &level, &headSize);
// TODO: for duplicate keys, you do not need to allocate memory here bytes = headSize + dataRowLen(row);
SSkipListNode *pNode = tsdbAllocBytes(pRepo, headSize + dataRowLen(row)); SSkipListNode *pNode = tsdbAllocBytes(pRepo, bytes);
if (pNode == NULL) { if (pNode == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s since %s", REPO_ID(pRepo), key, tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
TABLE_CHAR_NAME(pTable), tstrerror(terrno)); REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), bytes, tstrerror(terrno));
return -1; return -1;
} }
SMemTable *pMemTable = pRepo->mem;
ASSERT(pMemTable != NULL);
pNode->level = level; pNode->level = level;
dataRowCpy(SL_GET_NODE_DATA(pNode), row); dataRowCpy(SL_GET_NODE_DATA(pNode), row);
STableData *pTableData = pMemTable->tData[TABLE_TID(pTable)]; // Operations above may change pRepo->mem, retake those values
if (pTableData == NULL) { ASSERT(pRepo->mem != NULL);
pTableData = tsdbNewTableData(pCfg); pMemTable = pRepo->mem;
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TALBE_UID(pTable)) {
if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem)
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
}
pTableData = tsdbNewTableData(pCfg, pTable);
if (pTableData == NULL) { if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s since %s", REPO_ID(pRepo), key, tsdbError("vgId:%d failed to insert row with key %" PRId64
TABLE_CHAR_NAME(pTable), tstrerror(terrno)); " to table %s while create new table data object since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
tsdbFreeBytes(pRepo, (void *)pNode, bytes);
return -1; return -1;
} }
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
} }
ASSERT(pTableData != NULL); ASSERT(pTableData != NULL) && pTableData->uid == TALBE_UID(pTable);
if (pTableData->uid != TALBE_UID(pTable)) {
// TODO
}
if (tSkipListPut(pTableData->pData, pNode) == NULL) { if (tSkipListPut(pTableData->pData, pNode) == NULL) {
tsdbFreeBytes(pRepo, (void *)pNode, headSize + dataRowLen); tsdbFreeBytes(pRepo, (void *)pNode, bytes);
return 0; } else {
} if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
if (pMemTable->keyLast < key) pMemTable->keyLast = key;
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; pMemTable->numOfRows++;
if (pMemTable->keyLast < key) pMemTable->keyLast = key;
pMemTable->numOfRows++;
if (pTableData->keyFirst > key) pTableData->keyFirst = key; if (pTableData->keyFirst > key) pTableData->keyFirst = key;
if (pTableData->keyLast < key) pTableData->keyLast = key; if (pTableData->keyLast < key) pTableData->keyLast = key;
pTableData->numOfRows++; pTableData->numOfRows++;
ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData)); ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
}
tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TALBE_UID(pTable), key); TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TALBE_UID(pTable), key);
...@@ -82,12 +90,14 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -82,12 +90,14 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
} }
int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
ASSERT(IS_REPO_LOCKED(pRepo));
ASSERT(pMemTable != NULL); ASSERT(pMemTable != NULL);
T_REF_INC(pMemTable); T_REF_INC(pMemTable);
} }
// Need to lock the repository // Need to lock the repository
int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
ASSERT(IS_REPO_LOCKED(pRepo));
ASSERT(pMemTable != NULL); ASSERT(pMemTable != NULL);
if (T_REF_DEC(pMemTable) == 0) { if (T_REF_DEC(pMemTable) == 0) {
...@@ -117,7 +127,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { ...@@ -117,7 +127,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
if (pRepo->mem == NULL) return NULL; if (pRepo == NULL || pRepo->mem == NULL) return NULL;
SListNode *pNode = listTail(pRepo->mem); SListNode *pNode = listTail(pRepo->mem);
if (pNode == NULL) return NULL; if (pNode == NULL) return NULL;
...@@ -258,13 +268,14 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { ...@@ -258,13 +268,14 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
pTableData->keyLast = 0; pTableData->keyLast = 0;
pTableData->numOfRows = 0; pTableData->numOfRows = 0;
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, ); pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, tsdbGetTsTupleKey);
if (pTableData->pData == NULL) { if (pTableData->pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
} }
// TODO // TODO: operation here should not be here, remove it
pTableData->pData->level = 1; pTableData->pData->level = 1;
return pTableData; return pTableData;
...@@ -279,4 +290,6 @@ static void tsdbFreeTableData(STableData *pTableData) { ...@@ -279,4 +290,6 @@ static void tsdbFreeTableData(STableData *pTableData) {
tSkipListDestroy(pTableData->pData); tSkipListDestroy(pTableData->pData);
free(pTableData); free(pTableData);
} }
} }
\ No newline at end of file
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册