提交 6eba0ff9 编写于 作者: H Hongze Cheng

TD-353

上级 3a759852
......@@ -110,6 +110,17 @@ typedef struct {
SList* bufBlockList;
} SMemTable;
enum { TSDB_UPDATE_META, TSDB_DROP_META };
typedef struct __attribute__((packed)){
char act;
uint64_t uid;
} SActObj;
typedef struct {
int len;
char cont[];
} SActCont;
// ------------------ tsdbFile.c
extern const char* tsdbFileSuffix[];
typedef enum {
......@@ -302,10 +313,11 @@ void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
// ------------------ tsdbMemTable.c
int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
// ------------------ tsdbFile.c
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
......
......@@ -25,7 +25,6 @@ typedef struct {
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
static void * tsdbAllocBytes(STsdbRepo *pRepo, int bytes);
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
static SMemTable * tsdbNewMemTable(STsdbCfg *pCfg);
static void tsdbFreeMemTable(SMemTable *pMemTable);
......@@ -33,6 +32,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static void * tsdbCommitData(void *arg);
static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo);
static TSKEY tsdbNextIterKey(SCommitIter *pIter);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
......@@ -170,21 +170,7 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
return 0;
}
// ---------------- LOCAL FUNCTIONS ----------------
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL;
SListNode *pNode = listTail(pRepo->mem->bufBlockList);
if (pNode == NULL) return NULL;
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock));
return pBufBlock;
}
static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
STsdbCfg * pCfg = &pRepo->config;
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
int code = 0;
......@@ -256,6 +242,20 @@ static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
return ptr;
}
// ---------------- LOCAL FUNCTIONS ----------------
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL;
SListNode *pNode = listTail(pRepo->mem->bufBlockList);
if (pNode == NULL) return NULL;
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock));
return pBufBlock;
}
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL);
......@@ -396,10 +396,13 @@ static void *tsdbCommitData(void *arg) {
}
}
// TODO: Do retention actions
tsdbFitRetention(pRepo);
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
// TODO: Commit action meta data
tsdbFitRetention(pRepo);
_exit:
tdFreeDataCols(pDataCols);
......@@ -411,6 +414,52 @@ _exit:
return NULL;
}
static int tsdbCommitMeta(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SActObj * pAct = NULL;
SActCont * pCont = NULL;
if (listNEles(pMem->actList) > 0) {
if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
SListNode *pNode = NULL;
while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
pAct = (SActObj *)pNode->data;
if (pAct->act == TSDB_UPDATE_META) {
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
goto _err;
}
} else if (pAct->act == TSDB_DROP_META) {
if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
goto _err;
}
} else {
ASSERT(false);
}
}
if (tdKVStoreEndCommit(pMeta->pStore) < 0) {
tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
}
return 0;
_err:
return -1;
}
static void tsdbEndCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->commit == 1);
tsdbLockRepo(pRepo);
......
......@@ -46,6 +46,8 @@ static int tsdbEncodeTableName(void **buf, tstr *name);
static void * tsdbDecodeTableName(void *buf, tstr **name);
static int tsdbEncodeTable(void **buf, STable *pTable);
static void * tsdbDecodeTable(void *buf, STable **pRTable);
static int tsdbGetTableEncodeSize(int8_t act, STable *pTable);
static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable);
// ------------------ OUTER FUNCTIONS ------------------
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
......@@ -85,20 +87,18 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
}
if (tsdbAddTableToMeta(pRepo, table, true) < 0) goto _err;
int tlen = tsdbEncodeTable(NULL, pTable);
ASSERT(tlen > 0);
// // Write to meta file
// int bufLen = 0;
// char *buf = malloc(1024 * 1024);
// if (newSuper) {
// tsdbEncodeTable(super, buf, &bufLen);
// tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen);
// }
// tsdbEncodeTable(table, buf, &bufLen);
// tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen);
// tfree(buf);
// Write to memtable action
int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0;
int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table);
int tlen = tlen1 + tlen2;
void *buf = tsdbAllocBytes(pRepo, tlen);
ASSERT(buf != NULL);
if (newSuper) {
void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, super);
ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen1);
buf = pBuf;
}
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, super);
return 0;
......@@ -1107,4 +1107,33 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
*pRTable = pTable;
return buf;
}
static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) {
int tlen = sizeof(SListNode) + sizeof(SActObj);
if (act == TSDB_UPDATE_META) tlen += (sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM));
return tlen;
}
static void *tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable) {
SListNode *pNode = (SListNode *)buf;
SActObj * pAct = (SActObj *)(pNode->data);
SActCont * pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(*pAct));
void * pBuf = (void *)pCont;
pNode->prev = pNode->next = NULL;
pAct->act = act;
pAct->uid = TABLE_UID(pTable);
if (act == TSDB_UPDATE_META) {
pBuf = (void *)(pCont->cont);
pCont->len = tsdbEncodeTable(&pBuf, pTable) + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pCont->cont, pCont->len);
pBuf = POINTER_SHIFT(pBuf, sizeof(TSCKSUM));
}
tdListAppendNode(pRepo->mem->actList, pNode);
return pBuf;
}
\ No newline at end of file
......@@ -1136,6 +1136,7 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t
// Init block part
if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
// TODO: pMeta->maxRowBytes and pMeta->maxCols may change here causing invalid write
pHelper->pBuffer =
tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pMeta->maxCols +
pMeta->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册