提交 081e7120 编写于 作者: H Hongze Cheng

make action atomic

上级 cae5edea
...@@ -123,6 +123,7 @@ typedef struct { ...@@ -123,6 +123,7 @@ typedef struct {
int32_t maxTables; int32_t maxTables;
STableData** tData; STableData** tData;
SList* actList; SList* actList;
SList* extraBuffList;
SList* bufBlockList; SList* bufBlockList;
} SMemTable; } SMemTable;
...@@ -392,6 +393,8 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { ...@@ -392,6 +393,8 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
} }
// ------------------ tsdbBuffer.c // ------------------ tsdbBuffer.c
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
STsdbBufPool* tsdbNewBufPool(); STsdbBufPool* tsdbNewBufPool();
void tsdbFreeBufPool(STsdbBufPool* pBufPool); void tsdbFreeBufPool(STsdbBufPool* pBufPool);
int tsdbOpenBufPool(STsdbRepo* pRepo); int tsdbOpenBufPool(STsdbRepo* pRepo);
...@@ -425,6 +428,19 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { ...@@ -425,6 +428,19 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
return dataRowKey(row); return dataRowKey(row);
} }
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL;
SListNode* pNode = listTail(pRepo->mem->bufBlockList);
if (pNode == NULL) return NULL;
STsdbBufBlock* pBufBlock = NULL;
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock));
return pBufBlock;
}
// ------------------ tsdbFile.c // ------------------ tsdbFile.c
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) #define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
...@@ -523,6 +539,7 @@ char* tsdbGetDataDirName(char* rootDir); ...@@ -523,6 +539,7 @@ char* tsdbGetDataDirName(char* rootDir);
int tsdbGetNextMaxTables(int tid); int tsdbGetNextMaxTables(int tid);
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -192,6 +192,8 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * ...@@ -192,6 +192,8 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
} }
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0; return 0;
} }
...@@ -387,6 +389,21 @@ int tsdbGetNextMaxTables(int tid) { ...@@ -387,6 +389,21 @@ int tsdbGetNextMaxTables(int tid) {
return maxTables + 1; return maxTables + 1;
} }
int tsdbCheckCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->mem != NULL);
STsdbCfg *pCfg = &(pRepo->config);
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL);
if ((pRepo->mem->extraBuffList != NULL) ||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1;
}
return 0;
}
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; } STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; } STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; } STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
#define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_DATA_SKIPLIST_LEVEL 5
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes); static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable); static void tsdbFreeMemTable(SMemTable *pMemTable);
...@@ -202,44 +200,59 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) ...@@ -202,44 +200,59 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem)
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
STsdbCfg * pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); STsdbBufBlock *pBufBlock = NULL;
void * ptr = NULL;
if (pBufBlock != NULL && pBufBlock->remain < bytes) {
if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) { // need to commit mem
if (tsdbAsyncCommit(pRepo) < 0) return NULL;
} else {
if (tsdbLockRepo(pRepo) < 0) return NULL;
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
tdListAppendNode(pRepo->mem->bufBlockList, pNode);
if (tsdbUnlockRepo(pRepo) < 0) return NULL;
}
}
// Either allocate from buffer blocks or from SYSTEM memory pool
if (pRepo->mem == NULL) { if (pRepo->mem == NULL) {
SMemTable *pMemTable = tsdbNewMemTable(pRepo); SMemTable *pMemTable = tsdbNewMemTable(pRepo);
if (pMemTable == NULL) return NULL; if (pMemTable == NULL) return NULL;
pRepo->mem = pMemTable;
}
ASSERT(pRepo->mem != NULL);
if (tsdbLockRepo(pRepo) < 0) { pBufBlock = tsdbGetCurrBufBlock(pRepo);
tsdbFreeMemTable(pMemTable); if ((pRepo->mem->extraBuffList != NULL) ||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < bytes))) {
// allocate from SYSTEM buffer pool
if (pRepo->mem->extraBuffList == NULL) {
pRepo->mem->extraBuffList = tdListNew(0);
if (pRepo->mem->extraBuffList == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
}
ASSERT(pRepo->mem->extraBuffList != NULL);
SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes);
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL; return NULL;
} }
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo); pNode->next = pNode->prev = NULL;
tdListAppendNode(pMemTable->bufBlockList, pNode); tdListAppend(pRepo->mem->extraBuffList, pNode);
pRepo->mem = pMemTable; ptr = (void *)(pNode->data);
tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes);
} else { // allocate from TSDB buffer pool
if (pBufBlock == NULL || pBufBlock->remain < bytes) {
ASSERT(listNEles(pRepo->mem->bufBlockList) < pCfg->totalBlocks / 3);
if (tsdbLockRepo(pRepo) < 0) return NULL;
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
tdListAppendNode(pRepo->mem->bufBlockList, pNode);
if (tsdbUnlockRepo(pRepo) < 0) return NULL;
pBufBlock = tsdbGetCurrBufBlock(pRepo);
}
if (tsdbUnlockRepo(pRepo) < 0) return NULL; ASSERT(pBufBlock->remain >= bytes);
ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset);
pBufBlock->offset += bytes;
pBufBlock->remain -= bytes;
tsdbTrace("vgId:%d allocate %d bytes from TSDB buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
} }
pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock->remain >= bytes);
void *ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset);
pBufBlock->offset += bytes;
pBufBlock->remain -= bytes;
tsdbTrace("vgId:%d allocate %d bytes from buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
return ptr; return ptr;
} }
...@@ -340,27 +353,23 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -340,27 +353,23 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
} }
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL;
SListNode *pNode = listTail(pRepo->mem->bufBlockList);
if (pNode == NULL) return NULL;
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock));
return pBufBlock;
}
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) { static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); ASSERT(pRepo->mem != NULL);
ASSERT(pBufBlock != NULL); if (pRepo->mem->extraBuffList == NULL) {
pBufBlock->offset -= bytes; STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
pBufBlock->remain += bytes; ASSERT(pBufBlock != NULL);
ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset)); pBufBlock->offset -= bytes;
tsdbTrace("vgId:%d return %d bytes to buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, pBufBlock->remain += bytes;
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
} else {
SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -sizeof(SListNode));
ASSERT(listTail(pRepo->mem->extraBuffList) == pNode);
tdListPopNode(pRepo->mem->extraBuffList, pNode);
free(pNode);
tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
}
} }
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
...@@ -409,6 +418,7 @@ static void tsdbFreeMemTable(SMemTable* pMemTable) { ...@@ -409,6 +418,7 @@ static void tsdbFreeMemTable(SMemTable* pMemTable) {
ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0)); ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0));
ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0)); ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0));
tdListFree(pMemTable->extraBuffList);
tdListFree(pMemTable->bufBlockList); tdListFree(pMemTable->bufBlockList);
tdListFree(pMemTable->actList); tdListFree(pMemTable->actList);
taosTFree(pMemTable->tData); taosTFree(pMemTable->tData);
......
...@@ -120,20 +120,23 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { ...@@ -120,20 +120,23 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
tsdbUnlockRepoMeta(pRepo); tsdbUnlockRepoMeta(pRepo);
// Write to memtable action // Write to memtable action
int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0; // TODO: refactor duplicate codes
int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table); int tlen = 0;
int tlen = tlen1 + tlen2; void *pBuf = NULL;
void *buf = tsdbAllocBytes(pRepo, tlen);
if (buf == NULL) {
goto _err;
}
if (newSuper) { if (newSuper) {
void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, super); tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, super);
ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen1); pBuf = tsdbAllocBytes(pRepo, tlen);
buf = pBuf; if (pBuf == NULL) goto _err;
void *tBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, pBuf, super);
ASSERT(POINTER_DISTANCE(tBuf, pBuf) == tlen);
} }
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, table); tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table);
pBuf = tsdbAllocBytes(pRepo, tlen);
if (pBuf == NULL) goto _err;
void *tBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, pBuf, table);
ASSERT(POINTER_DISTANCE(tBuf, pBuf) == tlen);
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0; return 0;
...@@ -182,6 +185,8 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { ...@@ -182,6 +185,8 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid); tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid);
free(tbname); free(tbname);
if (tsdbCheckCommit(pRepo) < 0) goto _err;
return 0; return 0;
_err: _err:
...@@ -405,6 +410,8 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { ...@@ -405,6 +410,8 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
} }
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册