提交 319e745d 编写于 作者: H Hongze Cheng

more work

上级 1d88e84d
......@@ -219,7 +219,7 @@ SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid);
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg);
......@@ -229,7 +229,7 @@ SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter);
// SDataFReader
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
......
......@@ -31,13 +31,13 @@ typedef struct {
TSKEY maxKey;
// commit file data
SDataFReader *pReader;
SMapData oBlockIdxMap; // SMapData<SBlockIdx>, read from reader
SMapData oBlockMap; // SMapData<SBlock>, read from reader
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData oBlockMap; // SMapData<SBlock>, read from reader
SBlock oBlock;
SBlockData oBlockData;
SDataFWriter *pWriter;
SMapData nBlockIdxMap; // SMapData<SBlockIdx>, build by committer
SMapData nBlockMap; // SMapData<SBlock>
SArray *aBlockIdxN; // SArray<SBlockIdx>
SMapData nBlockMap; // SMapData<SBlock>
SBlock nBlock;
SBlockData nBlockData;
int64_t suid;
......@@ -245,9 +245,10 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
} else {
goto _commit_disk_del;
}
} else if (pTbData) {
goto _commit_mem_del;
} else {
if (pTbData) goto _commit_mem_del;
if (pDelIdx) goto _commit_disk_del;
goto _commit_disk_del;
}
_commit_mem_del:
......@@ -326,7 +327,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->nextKey = TSKEY_MAX;
// old
tMapDataReset(&pCommitter->oBlockIdxMap);
taosArrayClear(pCommitter->aBlockIdx);
tMapDataReset(&pCommitter->oBlockMap);
tBlockReset(&pCommitter->oBlock);
tBlockDataReset(&pCommitter->oBlockData);
......@@ -335,12 +336,12 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err;
code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdxMap, NULL);
code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL);
if (code) goto _err;
}
// new
tMapDataReset(&pCommitter->nBlockIdxMap);
taosArrayClear(pCommitter->aBlockIdxN);
tMapDataReset(&pCommitter->nBlockMap);
tBlockReset(&pCommitter->nBlock);
tBlockDataReset(&pCommitter->nBlockData);
......@@ -627,13 +628,16 @@ _err:
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
int32_t code = 0;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = suid, .uid = uid};
SBlockIdx blockIdx = {.suid = suid, .uid = uid};
SBlockIdx *pBlockIdx = &blockIdx;
code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx);
if (code) goto _err;
if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
return code;
......@@ -720,7 +724,8 @@ _err:
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
int32_t code = 0;
STbDataIter *pIter = &(STbDataIter){0};
STbDataIter iter = {0};
STbDataIter *pIter = &iter;
TSDBROW *pRow;
int32_t iBlock;
int32_t nBlock;
......@@ -883,19 +888,14 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
int32_t iTbData = 0;
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
int32_t iBlockIdx = 0;
int32_t nBlockIdx = pCommitter->oBlockIdxMap.nItem;
int32_t nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx);
STbData *pTbData;
SBlockIdx *pBlockIdx = &(SBlockIdx){0};
SBlockIdx *pBlockIdx;
ASSERT(nTbData > 0);
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
if (iBlockIdx < nBlockIdx) {
tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
} else {
pBlockIdx = NULL;
}
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
while (pTbData || pBlockIdx) {
if (pTbData && pBlockIdx) {
int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
......@@ -918,11 +918,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
if (code) goto _err;
iTbData++;
if (iTbData < nTbData) {
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
} else {
pTbData = NULL;
}
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
continue;
_commit_table_disk_data:
......@@ -930,11 +926,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
if (code) goto _err;
iBlockIdx++;
if (iBlockIdx < nBlockIdx) {
tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
} else {
pBlockIdx = NULL;
}
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
continue;
_commit_table_mem_and_disk:
......@@ -942,17 +934,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
if (code) goto _err;
iBlockIdx++;
if (iBlockIdx < nBlockIdx) {
tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
} else {
pBlockIdx = NULL;
}
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
iTbData++;
if (iTbData < nTbData) {
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
} else {
pTbData = NULL;
}
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
continue;
}
......@@ -967,7 +951,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// write blockIdx
code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdxMap, NULL);
code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
if (code) goto _err;
// update file header
......@@ -1051,11 +1035,11 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0;
pCommitter->pReader = NULL;
pCommitter->oBlockIdxMap = tMapDataInit();
pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
pCommitter->oBlockMap = tMapDataInit();
pCommitter->oBlock = tBlockInit();
pCommitter->pWriter = NULL;
pCommitter->nBlockIdxMap = tMapDataInit();
pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
pCommitter->nBlockMap = tMapDataInit();
pCommitter->nBlock = tBlockInit();
code = tBlockDataInit(&pCommitter->oBlockData);
......@@ -1071,11 +1055,11 @@ _exit:
}
static void tsdbCommitDataEnd(SCommitter *pCommitter) {
// tMapDataClear(&pCommitter->oBlockIdxMap);
taosArrayDestroy(pCommitter->aBlockIdx);
// tMapDataClear(&pCommitter->oBlockMap);
// tBlockClear(&pCommitter->oBlock);
// tBlockDataClear(&pCommitter->oBlockData);
// tMapDataClear(&pCommitter->nBlockIdxMap);
taosArrayDestroy(pCommitter->aBlockIdxN);
// tMapDataClear(&pCommitter->nBlockMap);
// tBlockClear(&pCommitter->nBlock);
// tBlockDataClear(&pCommitter->nBlockData);
......
......@@ -747,39 +747,36 @@ _end:
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
int32_t code = 0;
SMapData blockIdxMap = {0};
tMapDataReset(&blockIdxMap);
SArray *aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
code = tsdbReadBlockIdx(pFileReader, &blockIdxMap, NULL);
code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (blockIdxMap.nItem == 0) {
if (taosArrayGetSize(aBlockIdx) == 0) {
taosArrayClear(aBlockIdx);
return TSDB_CODE_SUCCESS;
}
SBlockIdx blockIndex = {0};
for (int32_t i = 0; i < blockIdxMap.nItem; ++i) {
code = tMapDataGetItemByIdx(&blockIdxMap, i, &blockIndex, tGetBlockIdx);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
SBlockIdx *pBlockIdx;
for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) {
pBlockIdx = (SBlockIdx *)taosArrayGet(aBlockIdx, i);
if (blockIndex.suid != pReader->suid) {
if (pBlockIdx->suid != pReader->suid) {
continue;
}
// this block belongs to a table that is not queried.
void* p = taosHashGet(pReader->status.pTableMap, &blockIndex.uid, sizeof(uint64_t));
void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
if (p == NULL) {
continue;
}
if ((ASCENDING_TRAVERSE(pReader->order) &&
(blockIndex.minKey > pReader->window.ekey || blockIndex.maxKey < pReader->window.skey)) ||
(pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey)) ||
(!ASCENDING_TRAVERSE(pReader->order) &&
(blockIndex.minKey > pReader->window.skey || blockIndex.maxKey < pReader->window.ekey))) {
(pBlockIdx->minKey > pReader->window.skey || pBlockIdx->maxKey < pReader->window.ekey))) {
continue;
}
......@@ -788,15 +785,15 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock));
}
pScanInfo->blockIdx = blockIndex;
taosArrayPush(pIndexList, &blockIndex);
pScanInfo->blockIdx = *pBlockIdx;
taosArrayPush(pIndexList, pBlockIdx);
}
// tMapDataClear(&blockIdxMap);
taosArrayDestroy(aBlockIdx);
return TSDB_CODE_SUCCESS;
_err:
tMapDataClear(&blockIdxMap);
taosArrayDestroy(aBlockIdx);
return code;
}
......
......@@ -519,15 +519,18 @@ _err:
return code;
}
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pReader->pSet->fHead.offset;
int64_t size = pReader->pSet->fHead.size - offset;
int64_t n;
uint32_t delimiter;
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pReader->pSet->fHead.offset;
int64_t size = pReader->pSet->fHead.size - offset;
uint8_t *pBuf = NULL;
int64_t n;
uint32_t delimiter;
SBlockIdx blockIdx;
if (!ppBuf) ppBuf = &pBuf;
// alloc
if (!ppBuf) ppBuf = &mBlockIdx->pBuf;
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
......@@ -554,16 +557,27 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *mBlockIdx, uint8_t **p
}
// decode
n = 0;
n += tGetU32(*ppBuf + n, &delimiter);
n = tGetU32(*ppBuf + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
n += tGetMapData(*ppBuf + n, mBlockIdx);
taosArrayClear(aBlockIdx);
while (n < size - sizeof(TSCKSUM)) {
n += tGetBlockIdx(*ppBuf + n, &blockIdx);
if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
ASSERT(n + sizeof(TSCKSUM) == size);
tsdbFree(pBuf);
return code;
_err:
tsdbError("vgId:%d read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tsdbFree(pBuf);
return code;
}
......@@ -1257,27 +1271,31 @@ _err:
return code;
}
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size;
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
int64_t n;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
if (!ppBuf) ppBuf = &pBuf;
// prepare
size = 0;
size += tPutU32(NULL, TSDB_FILE_DLMT);
size = size + tPutMapData(NULL, mBlockIdx) + sizeof(TSCKSUM);
size = tPutU32(NULL, TSDB_FILE_DLMT);
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx));
}
size += sizeof(TSCKSUM);
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
// build
n = 0;
n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n += tPutMapData(*ppBuf + n, mBlockIdx);
n = tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
n += tPutBlockIdx(*ppBuf + n, taosArrayGet(aBlockIdx, iBlockIdx));
}
taosCalcChecksumAppend(0, *ppBuf, size);
ASSERT(n + sizeof(TSCKSUM) == size);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册