From 6eba0ff9f0381f4a33fe4e44c05ead620ccfadef Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 18 Jun 2020 02:02:38 +0000 Subject: [PATCH] TD-353 --- src/tsdb/inc/tsdbMain.h | 20 +++++++-- src/tsdb/src/tsdbMemTable.c | 87 +++++++++++++++++++++++++++++-------- src/tsdb/src/tsdbMeta.c | 57 ++++++++++++++++++------ src/tsdb/src/tsdbRWHelper.c | 1 + 4 files changed, 128 insertions(+), 37 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 44e8002ac1..2602544bb4 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -110,6 +110,17 @@ typedef struct { SList* bufBlockList; } SMemTable; +enum { TSDB_UPDATE_META, TSDB_DROP_META }; +typedef struct __attribute__((packed)){ + char act; + uint64_t uid; +} SActObj; + +typedef struct { + int len; + char cont[]; +} SActCont; + // ------------------ tsdbFile.c extern const char* tsdbFileSuffix[]; typedef enum { @@ -302,10 +313,11 @@ void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); // ------------------ tsdbMemTable.c -int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); -int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); +int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); +int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); +int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); +int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); +void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); // ------------------ tsdbFile.c #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index ef9cc1b07e..4b8c76702e 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -25,7 +25,6 @@ typedef struct { static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo); -static void * tsdbAllocBytes(STsdbRepo *pRepo, int bytes); static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes); static SMemTable * tsdbNewMemTable(STsdbCfg *pCfg); static void tsdbFreeMemTable(SMemTable *pMemTable); @@ -33,6 +32,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); static char * tsdbGetTsTupleKey(const void *data); static void * tsdbCommitData(void *arg); +static int tsdbCommitMeta(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo); static TSKEY tsdbNextIterKey(SCommitIter *pIter); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); @@ -170,21 +170,7 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { return 0; } -// ---------------- LOCAL FUNCTIONS ---------------- -static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { - ASSERT(pRepo != NULL); - if (pRepo->mem == NULL) return NULL; - - SListNode *pNode = listTail(pRepo->mem->bufBlockList); - if (pNode == NULL) return NULL; - - STsdbBufBlock *pBufBlock = NULL; - tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock)); - - return pBufBlock; -} - -static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { +void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { STsdbCfg * pCfg = &pRepo->config; STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); int code = 0; @@ -256,6 +242,20 @@ static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { return ptr; } +// ---------------- LOCAL FUNCTIONS ---------------- +static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { + ASSERT(pRepo != NULL); + if (pRepo->mem == NULL) return NULL; + + SListNode *pNode = listTail(pRepo->mem->bufBlockList); + if (pNode == NULL) return NULL; + + STsdbBufBlock *pBufBlock = NULL; + tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock)); + + return pBufBlock; +} + static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) { STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); ASSERT(pBufBlock != NULL); @@ -396,10 +396,13 @@ static void *tsdbCommitData(void *arg) { } } - // TODO: Do retention actions - tsdbFitRetention(pRepo); + // Commit to update meta file + if (tsdbCommitMeta(pRepo) < 0) { + tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } - // TODO: Commit action meta data + tsdbFitRetention(pRepo); _exit: tdFreeDataCols(pDataCols); @@ -411,6 +414,52 @@ _exit: return NULL; } +static int tsdbCommitMeta(STsdbRepo *pRepo) { + SMemTable *pMem = pRepo->imem; + STsdbMeta *pMeta = pRepo->tsdbMeta; + SActObj * pAct = NULL; + SActCont * pCont = NULL; + + if (listNEles(pMem->actList) > 0) { + if (tdKVStoreStartCommit(pMeta->pStore) < 0) { + tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + SListNode *pNode = NULL; + + while ((pNode = tdListPopHead(pMem->actList)) != NULL) { + pAct = (SActObj *)pNode->data; + if (pAct->act == TSDB_UPDATE_META) { + pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); + if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { + tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, + tstrerror(terrno)); + goto _err; + } + } else if (pAct->act == TSDB_DROP_META) { + if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) { + tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, + tstrerror(terrno)); + goto _err; + } + } else { + ASSERT(false); + } + } + + if (tdKVStoreEndCommit(pMeta->pStore) < 0) { + tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + } + + return 0; + +_err: + return -1; +} + static void tsdbEndCommit(STsdbRepo *pRepo) { ASSERT(pRepo->commit == 1); tsdbLockRepo(pRepo); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 9eb5d10371..98c390f5a5 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -46,6 +46,8 @@ static int tsdbEncodeTableName(void **buf, tstr *name); static void * tsdbDecodeTableName(void *buf, tstr **name); static int tsdbEncodeTable(void **buf, STable *pTable); static void * tsdbDecodeTable(void *buf, STable **pRTable); +static int tsdbGetTableEncodeSize(int8_t act, STable *pTable); +static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable); // ------------------ OUTER FUNCTIONS ------------------ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { @@ -85,20 +87,18 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { } if (tsdbAddTableToMeta(pRepo, table, true) < 0) goto _err; - int tlen = tsdbEncodeTable(NULL, pTable); - ASSERT(tlen > 0); - - // // Write to meta file - // int bufLen = 0; - // char *buf = malloc(1024 * 1024); - // if (newSuper) { - // tsdbEncodeTable(super, buf, &bufLen); - // tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen); - // } - - // tsdbEncodeTable(table, buf, &bufLen); - // tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen); - // tfree(buf); + // Write to memtable action + int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0; + int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table); + int tlen = tlen1 + tlen2; + void *buf = tsdbAllocBytes(pRepo, tlen); + ASSERT(buf != NULL); + if (newSuper) { + void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, super); + ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen1); + buf = pBuf; + } + tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, super); return 0; @@ -1107,4 +1107,33 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) { *pRTable = pTable; return buf; +} + +static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) { + int tlen = sizeof(SListNode) + sizeof(SActObj); + if (act == TSDB_UPDATE_META) tlen += (sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM)); + + return tlen; +} + +static void *tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable) { + SListNode *pNode = (SListNode *)buf; + SActObj * pAct = (SActObj *)(pNode->data); + SActCont * pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(*pAct)); + void * pBuf = (void *)pCont; + + pNode->prev = pNode->next = NULL; + pAct->act = act; + pAct->uid = TABLE_UID(pTable); + + if (act == TSDB_UPDATE_META) { + pBuf = (void *)(pCont->cont); + pCont->len = tsdbEncodeTable(&pBuf, pTable) + sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)pCont->cont, pCont->len); + pBuf = POINTER_SHIFT(pBuf, sizeof(TSCKSUM)); + } + + tdListAppendNode(pRepo->mem->actList, pNode); + + return pBuf; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index ac3358cf2a..08fe69b0ee 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -1136,6 +1136,7 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t // Init block part if (tsdbInitHelperBlock(pHelper) < 0) goto _err; + // TODO: pMeta->maxRowBytes and pMeta->maxCols may change here causing invalid write pHelper->pBuffer = tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pMeta->maxCols + pMeta->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM)); -- GitLab