提交 5a859105 编写于 作者: H Hongze Cheng

more work

上级 fe27ee26
......@@ -19,20 +19,18 @@ typedef struct {
SMemTable *pMemTable;
int32_t minutes;
int8_t precision;
int32_t sfid;
int32_t efid;
TSKEY nCommitKey;
SReadH readh;
SDFileSet wSet;
SArray *aDelInfo;
SArray *aBlkIdx;
SArray *aSupBlk;
SArray *aSubBlk;
SArray *aDelInfo;
} SCommitH;
static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb);
static int32_t tsdbEndCommit(SCommitH *pCHandle);
static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid);
static int32_t tsdbCommitDelete(SCommitH *pCHandle);
static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb);
static int32_t tsdbCommitEnd(SCommitH *pCHandle);
static int32_t tsdbCommitImpl(SCommitH *pCHandle);
int32_t tsdbBegin2(STsdb *pTsdb) {
int32_t code = 0;
......@@ -53,26 +51,19 @@ int32_t tsdbCommit2(STsdb *pTsdb) {
SCommitH ch = {0};
// start to commit
code = tsdbStartCommit(&ch, pTsdb);
code = tsdbCommitStart(&ch, pTsdb);
if (code) {
goto _exit;
}
// commit
for (int32_t fid = ch.sfid; fid <= ch.efid; fid++) {
code = tsdbCommitToFile(&ch, fid);
if (code) {
goto _err;
}
}
code = tsdbCommitDelete(&ch);
code = tsdbCommitImpl(&ch);
if (code) {
goto _err;
}
// end commit
code = tsdbEndCommit(&ch);
code = tsdbCommitEnd(&ch);
if (code) {
goto _exit;
}
......@@ -81,11 +72,11 @@ _exit:
return code;
_err:
// TODO: rollback
tsdbError("vgId:%d failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) {
static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb) {
int32_t code = 0;
SMemTable *pMemTable = (SMemTable *)pTsdb->mem;
......@@ -100,28 +91,32 @@ static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) {
pCHandle->pMemTable = pMemTable;
pCHandle->minutes = pTsdb->keepCfg.days;
pCHandle->precision = pTsdb->keepCfg.precision;
pCHandle->sfid = TSDB_KEY_FID(pMemTable->minKey.ts, pCHandle->minutes, pCHandle->precision);
pCHandle->efid = TSDB_KEY_FID(pMemTable->maxKey.ts, pCHandle->minutes, pCHandle->precision);
pCHandle->nCommitKey = pMemTable->minKey.ts;
code = tsdbInitReadH(&pCHandle->readh, pTsdb);
if (code) {
goto _err;
}
pCHandle->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
pCHandle->aBlkIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCHandle->aBlkIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pCHandle->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
pCHandle->aSupBlk = taosArrayInit(0, sizeof(SBlock));
if (pCHandle->aSupBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pCHandle->aSubBlk = taosArrayInit(1024, sizeof(SBlock));
pCHandle->aSubBlk = taosArrayInit(0, sizeof(SBlock));
if (pCHandle->aSubBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pCHandle->aDelInfo = taosArrayInit(0, sizeof(SDelInfo));
if (pCHandle->aDelInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// start FS transaction
tsdbStartFSTxn(pTsdb, 0, 0);
......@@ -132,7 +127,7 @@ _err:
return code;
}
static int32_t tsdbEndCommit(SCommitH *pCHandle) {
static int32_t tsdbCommitEnd(SCommitH *pCHandle) {
int32_t code = 0;
STsdb *pTsdb = pCHandle->pMemTable->pTsdb;
SMemTable *pMemTable = (SMemTable *)pTsdb->imem;
......@@ -144,6 +139,7 @@ static int32_t tsdbEndCommit(SCommitH *pCHandle) {
}
// close handle
taosArrayClear(pCHandle->aDelInfo);
taosArrayClear(pCHandle->aSubBlk);
taosArrayClear(pCHandle->aSupBlk);
taosArrayClear(pCHandle->aBlkIdx);
......@@ -160,10 +156,29 @@ _err:
return code;
}
static int32_t tsdbCommitTableData(SCommitH *pCHandle, SMemData *pMemData, SBlockIdx *pBlockIdx) {
static int32_t tsdbCommitTableStart(SCommitH *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitTableEnd(SCommitH *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitTable(SCommitH *pCHandle, SMemData *pMemData, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SMemDataIter iter = {0};
// commit table start
code = tsdbCommitTableStart(pCHandle);
if (code) {
goto _err;
}
// commit table impl
if (pMemData && pBlockIdx) {
// merge
} else if (pMemData) {
......@@ -172,6 +187,15 @@ static int32_t tsdbCommitTableData(SCommitH *pCHandle, SMemData *pMemData, SBloc
// save old ones
}
// commit table end
code = tsdbCommitTableEnd(pCHandle);
if (code) {
goto _err;
}
return code;
_err:
return code;
}
......@@ -193,38 +217,58 @@ static int32_t tsdbTableIdCmprFn(const void *p1, const void *p2) {
return 0;
}
static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) {
static int32_t tsdbWriteBlockIdx(SDFile *pFile, SArray *pArray, uint8_t **ppBuf) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitFileStart(SCommitH *pCHandle, int32_t fid) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitFileEnd(SCommitH *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitFile(SCommitH *pCHandle, int32_t fid) {
int32_t code = 0;
SMemDataIter iter = {0};
TSDBROW *pRow = NULL;
int8_t hasData = 0;
TSKEY fidSKey;
TSKEY fidEKey;
int32_t iMemData = 0;
int32_t nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData);
int32_t iBlockIdx = 0;
int32_t iMemData;
int32_t nMemData;
int32_t iBlockIdx;
int32_t nBlockIdx;
// check if there are data in the time range
for (; iMemData < nMemData; iMemData++) {
SMemData *pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData);
tsdbMemDataIterOpen(pMemData, &(TSDBKEY){.ts = fidSKey, .version = 0}, 0, &iter);
tsdbMemDataIterGet(&iter, &pRow);
if (pRow->tsRow.ts >= fidSKey && pRow->tsRow.ts <= fidEKey) {
hasData = 1;
break;
}
}
if (!hasData) return code;
pCHandle->nCommitKey = TSKEY_MAX;
// create or open the file to commit(todo)
code = tsdbCommitFileStart(pCHandle, fid);
if (code) {
goto _err;
}
// loop to commit each table data
iMemData = 0;
nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData);
iBlockIdx = 0;
nBlockIdx = 0;
for (;;) {
if (iBlockIdx >= nBlockIdx && iMemData >= nMemData) break;
if (iBlockIdx >= nBlockIdx && iMemData >= nMemData) {
// code = tsdbWriteBlockIdx();
// if (code) {
// goto _err;
// }
break;
}
SMemData *pMemData = NULL;
SBlockIdx *pBlockIdx = NULL;
......@@ -256,12 +300,42 @@ static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) {
}
}
code = tsdbCommitTableData(pCHandle, pMemData, pBlockIdx);
code = tsdbCommitTable(pCHandle, pMemData, pBlockIdx);
if (code) {
goto _err;
}
}
// close file
code = tsdbCommitFileEnd(pCHandle);
if (code) {
goto _err;
}
return code;
_err:
return code;
}
static int32_t tsdbCommitData(SCommitH *pCHandle) {
int32_t code = 0;
int32_t fid;
if (pCHandle->pMemTable->nRows == 0) goto _exit;
// loop to commit to each file
for (;;) {
if (pCHandle->nCommitKey == TSKEY_MAX) break;
fid = TSDB_KEY_FID(pCHandle->nCommitKey, pCHandle->minutes, pCHandle->precision);
code = tsdbCommitFile(pCHandle, fid);
if (code) {
goto _err;
}
}
_exit:
return code;
_err:
......@@ -320,9 +394,46 @@ static int32_t tsdbCommitDelete(SCommitH *pCHandle) {
taosArraySort(pCHandle->aDelInfo, delInfoCmprFn);
// write to new file
_exit:
return code;
_err:
return code;
}
static int32_t tsdbCommitCache(SCommitH *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitImpl(SCommitH *pCHandle) {
int32_t code = 0;
// commit data
code = tsdbCommitData(pCHandle);
if (code) {
goto _err;
}
// commit delete
code = tsdbCommitDelete(pCHandle);
if (code) {
goto _err;
}
// commit cache if need (todo)
if (0) {
code = tsdbCommitCache(pCHandle);
if (code) {
goto _err;
}
}
return code;
_err:
return code;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册