提交 607f698f 编写于 作者: H Hongze Cheng

more work

上级 4e9e0c9b
......@@ -55,10 +55,10 @@ typedef struct SBlockData SBlockData;
typedef struct SReadH SReadH;
typedef struct SDelFile SDelFile;
typedef struct STsdbCacheFile STsdbCacheFile;
typedef struct STsdbIndexFile STsdbIndexFile;
typedef struct STsdbDataFile STsdbDataFile;
typedef struct STsdbLastFile STsdbLastFile;
typedef struct STsdbSmaFile STsdbSmaFile;
typedef struct SHeadFile SHeadFile;
typedef struct SDataFile SDataFile;
typedef struct SLastFile SLastFile;
typedef struct SSmaFile SSmaFile;
typedef struct SDFileSet SDFileSet;
typedef struct SDataFWriter SDataFWriter;
typedef struct SDataFReader SDataFReader;
......@@ -109,10 +109,10 @@ void tColDataReset(SColData *pColData);
void tColDataClear(SColData *pColData);
int32_t tColDataCmprFn(const void *p1, const void *p2);
// SBlockData
#define tsdbBlockDataCreate() ((SBlockData){0})
void tsdbBlockDataClear(SBlockData *pBlockData);
int32_t tsdbBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
void tsdbBlockDataDestroy(SBlockData *pBlockData);
#define tBlockDataInit() ((SBlockData){0})
void tBlockDataReset(SBlockData *pBlockData);
void tBlockDataClear(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
// SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph);
......@@ -419,6 +419,34 @@ struct SBlockDataHdr {
int64_t uid;
};
struct SHeadFile {
int64_t size;
int64_t offset;
int32_t nRef;
};
struct SDataFile {
int64_t size;
int32_t nRef;
};
struct SLastFile {
int64_t size;
int32_t nRef;
};
struct SSmaFile {
int64_t size;
int32_t nRef;
};
struct SDFileSet {
SHeadFile *pHeadFile;
SDataFile *pDataFile;
SLastFile *pLastFile;
SSmaFile *pSmaFile;
};
#ifdef __cplusplus
}
#endif
......
......@@ -23,19 +23,21 @@ typedef struct {
int32_t minRow;
int32_t maxRow;
// --------------
TSKEY nextKey; // need to be reset by each table commit
TSKEY nextKey; // reset by each table commit
int32_t commitFid;
TSKEY minKey;
TSKEY maxKey;
// commit file data
SDataFReader *pReader;
SMapData oBlockIdx; // SMapData<SBlockIdx>, read from reader
SMapData oBlock; // SMapData<SBlock>, read from reader
SBlockData bDataO;
SMapData oBlockIdxMap; // SMapData<SBlockIdx>, read from reader
SMapData oBlockMap; // SMapData<SBlock>, read from reader
SBlock oBlock;
SBlockData oBlockData;
SDataFWriter *pWriter;
SMapData nBlockIdx; // SMapData<SBlockIdx>, build by committer
SMapData nBlock; // SMapData<SBlock>
SBlockData bDataN;
SMapData nBlockIdxMap; // SMapData<SBlockIdx>, build by committer
SMapData nBlockMap; // SMapData<SBlock>
SBlock nBlock;
SBlockData nBlockData;
/* commit del */
SDelFReader *pDelFReader;
SMapData oDelIdxMap; // SMapData<SDelIdx>, old
......@@ -55,13 +57,12 @@ int32_t tsdbBegin(STsdb *pTsdb) {
int32_t code = 0;
code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
if (code) {
goto _err;
}
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -73,7 +74,8 @@ int32_t tsdbCommit(STsdb *pTsdb) {
SMemTable *pMemTable = pTsdb->mem;
// check
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { // TODO
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
// TODO: lock?
pTsdb->mem = NULL;
tsdbMemTableDestroy(pMemTable);
goto _exit;
......@@ -81,31 +83,21 @@ int32_t tsdbCommit(STsdb *pTsdb) {
// start commit
code = tsdbStartCommit(pTsdb, &commith);
if (code) {
goto _err;
}
if (code) goto _err;
// commit impl
code = tsdbCommitData(&commith);
if (code) {
goto _err;
}
if (code) goto _err;
code = tsdbCommitDel(&commith);
if (code) {
goto _err;
}
if (code) goto _err;
code = tsdbCommitCache(&commith);
if (code) {
goto _err;
}
if (code) goto _err;
// end commit
code = tsdbEndCommit(&commith, 0);
if (code) {
goto _err;
}
if (code) goto _err;
_exit:
return code;
......@@ -355,35 +347,35 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx
pRow = tsdbTbDataIterGet(pIter);
if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) {
if (pCommitter->bDataN.nRow == 0) {
if (pCommitter->nBlockData.nRow == 0) {
break;
} else {
goto _write_block_data;
}
}
code = tsdbBlockDataAppendRow(&pCommitter->bDataN, pRow, NULL /*TODO*/);
code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, NULL /*TODO*/);
if (code) goto _err;
if (pCommitter->bDataN.nRow < pCommitter->maxRow * 4 / 5) {
if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) {
continue;
}
_write_block_data:
if (!toDataOnly && pCommitter->bDataN.nRow < pCommitter->minKey) {
if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) {
block.last = 1;
} else {
block.last = 0;
}
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->bDataN, NULL, NULL, pBlockIdx, &block);
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlock, &block, tPutBlock);
code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock);
if (code) goto _err;
tBlockReset(&block);
tsdbBlockDataClear(&pCommitter->bDataN);
tBlockDataReset(&pCommitter->nBlockData);
}
return code;
......@@ -431,19 +423,19 @@ static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx,
TSDBKEY key1;
TSDBKEY key2;
tsdbBlockDataClear(&pCommitter->bDataN);
tBlockDataReset(&pCommitter->nBlockData);
// load last and merge until {pCommitter->maxKey, INT64_MAX}
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->bDataO, NULL, 0, NULL, NULL);
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, 0, NULL, NULL);
if (code) goto _err;
iRow = 0;
nRow = pCommitter->bDataO.nRow;
nRow = pCommitter->oBlockData.nRow;
pRow = tsdbTbDataIterGet(pIter);
while (true) {
if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) {
if (pCommitter->bDataN.nRow > 0) {
if (pCommitter->nBlockData.nRow > 0) {
goto _write_block_data;
} else {
break;
......@@ -453,16 +445,16 @@ static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx,
// TODO
_write_block_data:
block.last = pCommitter->bDataN.nRow < pCommitter->minRow ? 1 : 0;
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->bDataN, NULL, NULL, pBlockIdx, &block);
block.last = pCommitter->nBlockData.nRow < pCommitter->minRow ? 1 : 0;
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlock, &block, tPutBlock);
code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock);
if (code) goto _err;
}
tBlockReset(&block);
tsdbBlockDataClear(&pCommitter->bDataN);
tBlockDataReset(&pCommitter->nBlockData);
return code;
......@@ -475,7 +467,7 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb
int32_t code = 0;
TSDBROW *pRow;
SBlock block = tBlockInit();
SBlockData bDataN;
SBlockData nBlockData;
TSDBKEY key;
int32_t c;
......@@ -503,7 +495,7 @@ static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STb
c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
if (c > 0) {
// move block
code = tMapDataPutItem(&pCommitter->nBlock, pBlock, tPutBlock);
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
if (code) goto _err;
} else if (c == 0) {
int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock);
......@@ -545,10 +537,10 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit;
// start ================================
tMapDataReset(&pCommitter->oBlock);
tMapDataReset(&pCommitter->nBlock);
tMapDataReset(&pCommitter->oBlockMap);
tMapDataReset(&pCommitter->nBlockMap);
if (pBlockIdx) {
code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlock, NULL);
code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
if (code) goto _err;
}
......@@ -556,12 +548,12 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
SBlock block;
SBlock *pBlock = &block;
int32_t iBlock = 0;
int32_t nBlock = pCommitter->oBlock.nItem;
int32_t nBlockMap = pCommitter->oBlockMap.nItem;
// merge
pRow = tsdbTbDataIterGet(pIter);
while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlockMap) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, pBlock);
if (code) goto _err;
......@@ -579,8 +571,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
}
// disk
while (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
while (iBlock < nBlockMap) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, pBlock);
if (code) goto _err;
......@@ -589,10 +581,10 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
}
// end ===============================
code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlock, NULL, &blockIdx);
code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, &blockIdx);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockIdx, &blockIdx, tPutBlockIdx);
code = tMapDataPutItem(&pCommitter->nBlockIdxMap, &blockIdx, tPutBlockIdx);
if (code) goto _err;
_exit:
......@@ -619,21 +611,25 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
// memory
pCommitter->nextKey = TSKEY_MAX;
tMapDataReset(&pCommitter->oBlockIdx);
tMapDataReset(&pCommitter->oBlock);
tMapDataReset(&pCommitter->nBlockIdx);
tMapDataReset(&pCommitter->nBlock);
// load old
// old
tMapDataReset(&pCommitter->oBlockIdxMap);
tMapDataReset(&pCommitter->oBlockMap);
tBlockReset(&pCommitter->oBlock);
tBlockDataReset(&pCommitter->oBlockData);
if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err;
code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdx, NULL);
code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdxMap, NULL);
if (code) goto _err;
}
// create new
// new
tMapDataReset(&pCommitter->nBlockIdxMap);
tMapDataReset(&pCommitter->nBlockMap);
tBlockReset(&pCommitter->nBlock);
tBlockDataReset(&pCommitter->nBlockData);
code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
if (code) goto _err;
......@@ -653,18 +649,16 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
int32_t iTbData = 0;
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
int32_t iBlockIdx = 0;
int32_t nBlockIdx = pCommitter->oBlockIdx.nItem;
int32_t nBlockIdx = pCommitter->oBlockIdxMap.nItem;
STbData *pTbData;
SBlockIdx *pBlockIdx;
SBlockIdx blockIdx;
SBlockIdx *pBlockIdx = &blockIdx;
ASSERT(nTbData > 0);
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
if (iBlockIdx < nBlockIdx) {
pBlockIdx = &blockIdx;
code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, pBlockIdx, tGetBlockIdx);
if (code) goto _err;
tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
} else {
pBlockIdx = NULL;
}
......@@ -706,9 +700,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
iBlockIdx++;
if (iBlockIdx < nBlockIdx) {
pBlockIdx = &blockIdx;
code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, pBlockIdx, tGetBlockIdx);
if (code) goto _err;
tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
} else {
pBlockIdx = NULL;
}
......@@ -726,9 +718,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
pTbData = NULL;
}
if (iBlockIdx < nBlockIdx) {
pBlockIdx = &blockIdx;
code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, pBlockIdx, tGetBlockIdx);
if (code) goto _err;
tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
} else {
pBlockIdx = NULL;
}
......@@ -746,7 +736,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// write blockIdx
code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdx, NULL);
code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdxMap, NULL);
if (code) goto _err;
// update file header
......@@ -775,25 +765,20 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
// commit file data start
code = tsdbCommitFileDataStart(pCommitter);
if (code) {
goto _err;
}
if (code) goto _err;
// commit file data impl
code = tsdbCommitFileDataImpl(pCommitter);
if (code) {
goto _err;
}
if (code) goto _err;
// commit file data end
code = tsdbCommitFileDataEnd(pCommitter);
if (code) {
goto _err;
}
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d commit file data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -809,6 +794,10 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
// unlock();
pCommitter->pTsdb = pTsdb;
pCommitter->minutes = pTsdb->keepCfg.days;
pCommitter->precision = pTsdb->keepCfg.precision;
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
return code;
}
......
......@@ -19,34 +19,6 @@ static const char *tsdbFileSuffix[] = {".tombstone", ".cache", ".index", ".data"
// .tombstone
struct STsdbIndexFile {
int64_t size;
int64_t offset;
int32_t nRef;
};
struct STsdbDataFile {
int64_t size;
int32_t nRef;
};
struct STsdbLastFile {
int64_t size;
int32_t nRef;
};
struct STsdbSmaFile {
int64_t size;
int32_t nRef;
};
struct SDFileSet {
STsdbIndexFile *pIndexF;
STsdbDataFile *pDataF;
STsdbLastFile *pLastF;
STsdbSmaFile *pSmaF;
};
// SDelFile ===============================================
char *tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile) {
char *pName = NULL;
......
......@@ -493,7 +493,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
SSubmitBlkIter blkIter = {0};
TSDBKEY key = {.version = version};
SMemSkipListNode *pos[SL_MAX_LEVEL];
TSDBROW row = {.version = version, .pTSRow = NULL};
TSDBROW row = tsdbRowFromTSRow(version, NULL);
int32_t nRow = 0;
tInitSubmitBlkIter(pMsgIter, pBlock, &blkIter);
......
......@@ -441,15 +441,15 @@ _err:
return code;
}
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf) {
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = -1; // TODO
int64_t size = -1; // TODO
int64_t offset = pReader->pSet->pHeadFile->offset;
int64_t size = pReader->pSet->pHeadFile->size;
int64_t n;
uint32_t delimiter;
// alloc
if (!ppBuf) ppBuf = &pMapData->pBuf;
if (!ppBuf) ppBuf = &mBlockIdx->pBuf;
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
......@@ -479,7 +479,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **pp
n = 0;
n += tGetU32(*ppBuf + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
n += tGetMapData(*ppBuf + n, pMapData);
n += tGetMapData(*ppBuf + n, mBlockIdx);
ASSERT(n + sizeof(TSCKSUM) == size);
return code;
......@@ -489,7 +489,7 @@ _err:
return code;
}
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf) {
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pBlockIdx->offset;
int64_t size = pBlockIdx->size;
......@@ -499,7 +499,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMa
tb_uid_t uid;
// alloc
if (!ppBuf) ppBuf = &pMapData->pBuf;
if (!ppBuf) ppBuf = &mBlockIdx->pBuf;
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
......@@ -533,7 +533,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMa
ASSERT(suid == pBlockIdx->suid);
n += tGetI64(*ppBuf + n, &uid);
ASSERT(uid == pBlockIdx->uid);
n += tGetMapData(*ppBuf + n, pMapData);
n += tGetMapData(*ppBuf + n, mBlockIdx);
ASSERT(n + sizeof(TSCKSUM) == size);
return code;
......
......@@ -147,7 +147,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
default:
ASSERT(0);
}
n += tGetBinary(p ? p + n : p, &pMapData->pData, &pMapData->nData);
n += tGetBinary(p + n, &pMapData->pData, &pMapData->nData);
return n;
}
......@@ -697,14 +697,7 @@ static int32_t tsdbBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow) {
return code;
}
void tsdbBlockDataClear(SBlockData *pBlockData) {
pBlockData->nRow = 0;
for (int32_t iCol = 0; iCol < pBlockData->nCol; iCol++) {
pBlockData->aColData[iCol] = (SColData){.cid = 0, .type = 0, .bytes = 0, .flags = 0, .nData = 0};
}
}
int32_t tsdbBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
if (pRow->type == 0) {
......@@ -716,7 +709,12 @@ int32_t tsdbBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *
return code;
}
void tsdbBlockDataDestroy(SBlockData *pBlockData) {
void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->nRow = 0;
pBlockData->nCol = 0;
}
void tBlockDataClear(SBlockData *pBlockData) {
tsdbFree((uint8_t *)pBlockData->aVersion);
tsdbFree((uint8_t *)pBlockData->aTSKEY);
for (int32_t iCol = 0; iCol < pBlockData->nCol; iCol++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册