提交 3324f4e5 编写于 作者: H Hongze Cheng

more work

上级 2b019dce
......@@ -183,6 +183,8 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *m
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now);
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg);
int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
// tsdbMemTable ==============================================================================================
// SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
......@@ -238,9 +240,9 @@ int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile);
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast, uint8_t **ppBuf);
......@@ -248,9 +250,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
// SDataFReader
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf);
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
......@@ -605,6 +607,9 @@ struct SDataFWriter {
SLastFile fLast;
SSmaFile fSma;
uint8_t *pBuf1;
uint8_t *pBuf2;
SDiskData dData;
};
......
......@@ -465,13 +465,12 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (code) goto _err;
// tMapDataReset(&state->blockIdxMap);
// code = tsdbReadBlockIdx(state->pDataFReader, &state->blockIdxMap, NULL);
if (!state->aBlockIdx) {
state->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
} else {
taosArrayClear(state->aBlockIdx);
}
code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx, NULL);
code = tsdbReadBlockIdx(state->pDataFReader, state->aBlockIdx);
if (code) goto _err;
/* if (state->pBlockIdx) { */
......@@ -487,8 +486,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
}
tMapDataReset(&state->blockMap);
code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap, NULL);
/* code = tsdbReadBlock(state->pDataFReader, &state->blockIdx, &state->blockMap, NULL); */
code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap);
if (code) goto _err;
state->nBlock = state->blockMap.nItem;
......
......@@ -339,7 +339,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
pCommitter->dReader.pBlockIdx =
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL);
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
if (code) goto _exit;
ASSERT(pCommitter->dReader.mBlock.nItem > 0);
......@@ -370,7 +370,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
if (code) goto _err;
// data
code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL);
code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
if (code) goto _err;
pCommitter->dReader.iBlockIdx = 0;
......@@ -378,8 +378,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->dReader.pBlockIdx =
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code =
tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock, NULL);
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
if (code) goto _err;
} else {
pCommitter->dReader.pBlockIdx = NULL;
......@@ -387,7 +386,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
tBlockDataReset(&pCommitter->dReader.bData);
// last
code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL);
code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL);
if (code) goto _err;
pCommitter->dReader.iBlockL = -1;
......@@ -506,7 +505,7 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) {
tBlockReset(&block); // as a new block
}
// statistic
// info
block.nRow += pBlockData->nRow;
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
......@@ -532,7 +531,7 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) {
// write
block.nSubBlock++;
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock - 1],
((block.nSubBlock == 1) && block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0,
((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0,
NULL);
if (code) goto _err;
......@@ -557,7 +556,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
ASSERT(pBlockData->nRow > 0);
// statistic
// info
blockL.suid = pBlockData->suid;
blockL.nRow = pBlockData->nRow;
blockL.minVer = VERSION_MAX;
......@@ -1041,7 +1040,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
// end
if (pCommitter->dWriter.mBlock.nItem > 0) {
SBlockIdx blockIdx = {.suid = pTbData->suid, .uid = pTbData->uid};
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, &blockIdx);
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
......@@ -1067,11 +1066,11 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// write aBlockIdx
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL);
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
if (code) goto _err;
// write aBlockL
code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL, NULL);
code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL);
if (code) goto _err;
// update file header
......@@ -1107,7 +1106,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
if (pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) >= 0) break;
SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, NULL, &blockIdx);
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
......
......@@ -545,7 +545,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
int64_t st = taosGetTimestampUs();
int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL);
int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
......@@ -617,7 +617,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
tMapDataReset(&pScanInfo->mapData);
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData, NULL);
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
size += pScanInfo->mapData.nData;
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
......
......@@ -441,6 +441,9 @@ struct SDataFReader {
TdFilePtr pDataFD;
TdFilePtr pLastFD;
TdFilePtr pSmaFD;
uint8_t *pBuf1;
uint8_t *pBuf2;
};
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
......@@ -523,6 +526,8 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
goto _err;
}
tFree((*ppReader)->pBuf1);
tFree((*ppReader)->pBuf2);
taosMemoryFree(*ppReader);
_exit:
......@@ -534,14 +539,12 @@ _err:
return code;
}
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pReader->pSet->pHeadF->offset;
int64_t size = pReader->pSet->pHeadF->loffset - offset;
uint8_t *pBuf = NULL;
int64_t n;
uint32_t delimiter;
SBlockIdx blockIdx;
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
int32_t code = 0;
int64_t offset = pReader->pSet->pHeadF->offset;
int64_t size = pReader->pSet->pHeadF->loffset - offset;
int64_t n;
uint32_t delimiter;
taosArrayClear(aBlockIdx);
if (size == 0) {
......@@ -549,8 +552,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
}
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tRealloc(ppBuf, size);
code = tRealloc(&pReader->pBuf1, size);
if (code) goto _err;
// seek
......@@ -560,7 +562,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
}
// read
n = taosReadFile(pReader->pHeadFD, *ppBuf, size);
n = taosReadFile(pReader->pHeadFD, pReader->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -570,18 +572,19 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
}
// check
if (!taosCheckChecksumWhole(*ppBuf, size)) {
if (!taosCheckChecksumWhole(pReader->pBuf1, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// decode
n = 0;
n = tGetU32(*ppBuf + n, &delimiter);
n = tGetU32(pReader->pBuf1 + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
while (n < size - sizeof(TSCKSUM)) {
n += tGetBlockIdx(*ppBuf + n, &blockIdx);
SBlockIdx blockIdx;
n += tGetBlockIdx(pReader->pBuf1 + n, &blockIdx);
if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -591,24 +594,20 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
ASSERT(n + sizeof(TSCKSUM) == size);
tFree(pBuf);
_exit:
return code;
_err:
tsdbError("vgId:%d, read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) {
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) {
int32_t code = 0;
int64_t offset = pReader->pSet->pHeadF->loffset;
int64_t size = pReader->pSet->pHeadF->size - offset;
int64_t n;
uint32_t delimiter;
uint8_t *pBuf = NULL;
SBlockL blockl;
taosArrayClear(aBlockL);
if (size == 0) {
......@@ -616,8 +615,7 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf)
}
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tRealloc(ppBuf, size);
code = tRealloc(&pReader->pBuf1, size);
if (code) goto _err;
// seek
......@@ -627,7 +625,7 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf)
}
// read
n = taosReadFile(pReader->pHeadFD, *ppBuf, size);
n = taosReadFile(pReader->pHeadFD, pReader->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -637,18 +635,19 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf)
}
// check
if (!taosCheckChecksumWhole(*ppBuf, size)) {
if (!taosCheckChecksumWhole(pReader->pBuf1, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// decode
n = 0;
n = tGetU32(*ppBuf + n, &delimiter);
n = tGetU32(pReader->pBuf1 + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
while (n < size - sizeof(TSCKSUM)) {
n += tGetBlockL(*ppBuf + n, &blockl);
SBlockL blockl;
n += tGetBlockL(pReader->pBuf1 + n, &blockl);
if (taosArrayPush(aBlockL, &blockl) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -658,29 +657,23 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf)
ASSERT(n + sizeof(TSCKSUM) == size);
tFree(pBuf);
_exit:
return code;
_err:
tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pBlockIdx->offset;
int64_t size = pBlockIdx->size;
uint8_t *pBuf = NULL;
int64_t n;
int64_t tn;
SDiskDataHdr hdr;
if (!ppBuf) ppBuf = &pBuf;
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) {
int32_t code = 0;
int64_t offset = pBlockIdx->offset;
int64_t size = pBlockIdx->size;
int64_t n;
int64_t tn;
// alloc
code = tRealloc(ppBuf, size);
code = tRealloc(&pReader->pBuf1, size);
if (code) goto _err;
// seek
......@@ -690,7 +683,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
}
// read
n = taosReadFile(pReader->pHeadFD, *ppBuf, size);
n = taosReadFile(pReader->pHeadFD, pReader->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -700,19 +693,19 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
}
// check
if (!taosCheckChecksumWhole(*ppBuf, size)) {
if (!taosCheckChecksumWhole(pReader->pBuf1, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// decode
hdr = *(SDiskDataHdr *)(*ppBuf);
ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
ASSERT(hdr.suid == pBlockIdx->suid);
ASSERT(hdr.uid == pBlockIdx->uid);
n = 0;
uint32_t delimiter;
n += tGetU32(pReader->pBuf1 + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
n = sizeof(hdr);
tn = tGetMapData(*ppBuf + n, mBlock);
tn = tGetMapData(pReader->pBuf1 + n, mBlock);
if (tn < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......@@ -720,12 +713,10 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
n += tn;
ASSERT(n + sizeof(TSCKSUM) == size);
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d, read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
......@@ -1642,6 +1633,8 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
}
tDiskDataClear(&(*ppWriter)->dData);
tFree((*ppWriter)->pBuf1);
tFree((*ppWriter)->pBuf2);
taosMemoryFree(*ppWriter);
_exit:
*ppWriter = NULL;
......@@ -1732,10 +1725,9 @@ _err:
return code;
}
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf) {
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->fHead;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
......@@ -1746,29 +1738,28 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp
}
// prepare
size = tPutU32(NULL, TSDB_FILE_DLMT);
size = sizeof(uint32_t);
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx));
}
size += sizeof(TSCKSUM);
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tRealloc(ppBuf, size);
code = tRealloc(&pWriter->pBuf1, size);
if (code) goto _err;
// build
n = 0;
n = tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n = tPutU32(pWriter->pBuf1 + n, TSDB_FILE_DLMT);
for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
n += tPutBlockIdx(*ppBuf + n, taosArrayGet(aBlockIdx, iBlockIdx));
n += tPutBlockIdx(pWriter->pBuf1 + n, taosArrayGet(aBlockIdx, iBlockIdx));
}
taosCalcChecksumAppend(0, *ppBuf, size);
taosCalcChecksumAppend(0, pWriter->pBuf1, size);
ASSERT(n + sizeof(TSCKSUM) == size);
// write
n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size);
n = taosWriteFile(pWriter->pHeadFD, pWriter->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -1779,44 +1770,36 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp
pHeadFile->size += size;
_exit:
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d, write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->fHead;
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->fHead;
int64_t size;
int64_t n;
ASSERT(mBlock->nItem > 0);
// prepare
size = sizeof(SDiskDataHdr) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM);
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tRealloc(ppBuf, size);
size = sizeof(uint32_t) + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM);
code = tRealloc(&pWriter->pBuf1, size);
if (code) goto _err;
// build
n = 0;
*(SDiskDataHdr *)(*ppBuf) = hdr;
n += sizeof(hdr);
n += tPutMapData(*ppBuf + n, mBlock);
taosCalcChecksumAppend(0, *ppBuf, size);
n += tPutU32(pWriter->pBuf1 + n, TSDB_FILE_DLMT);
n += tPutMapData(pWriter->pBuf1 + n, mBlock);
taosCalcChecksumAppend(0, pWriter->pBuf1, size);
ASSERT(n + sizeof(TSCKSUM) == size);
// write
n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size);
n = taosWriteFile(pWriter->pHeadFD, pWriter->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -1827,21 +1810,18 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf,
pBlockIdx->size = size;
pHeadFile->size += size;
tFree(pBuf);
tsdbTrace("vgId:%d, write block, offset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), pBlockIdx->offset,
pBlockIdx->size);
return code;
_err:
tFree(pBuf);
tsdbError("vgId:%d, write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf) {
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) {
int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->fHead;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
......@@ -1852,29 +1832,28 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf)
}
// size
size = sizeof(uint32_t);
size = sizeof(uint32_t); // TSDB_FILE_DLMT
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) {
size += tPutBlockL(NULL, taosArrayGet(aBlockL, iBlockL));
}
size += sizeof(TSCKSUM);
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tRealloc(ppBuf, size);
code = tRealloc(&pWriter->pBuf1, size);
if (code) goto _err;
// encode
n = 0;
n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n += tPutU32(pWriter->pBuf1 + n, TSDB_FILE_DLMT);
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) {
n += tPutBlockL(*ppBuf + n, taosArrayGet(aBlockL, iBlockL));
n += tPutBlockL(pWriter->pBuf1 + n, taosArrayGet(aBlockL, iBlockL));
}
taosCalcChecksumAppend(0, *ppBuf, size);
taosCalcChecksumAppend(0, pWriter->pBuf1, size);
ASSERT(n + sizeof(TSCKSUM) == size);
// write
n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size);
n = taosWriteFile(pWriter->pHeadFD, pWriter->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -1885,12 +1864,10 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf)
pHeadFile->size += size;
_exit:
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
......@@ -1918,47 +1895,49 @@ static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) {
pBlock->nRow += pBlockData->nRow;
}
static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t **ppBuf) {
int32_t code = 0;
int64_t n;
SColData *pColData;
static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) {
int32_t code = 0;
// prepare
pSubBlock->nSma = 0;
pSmaInfo->offset = 0;
pSmaInfo->size = 0;
// encode
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue;
if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue;
SColumnDataAgg sma;
tsdbCalcColDataSMA(pColData, &sma);
pSubBlock->nSma++;
code = tRealloc(&pWriter->pBuf1, pSmaInfo->size + tPutColumnDataAgg(NULL, &sma));
if (code) goto _err;
pSmaInfo->size += tPutColumnDataAgg(pWriter->pBuf1 + pSmaInfo->size, &sma);
}
if (pSubBlock->nSma == 0) goto _exit;
// calc
code = tRealloc(ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
if (code) goto _err;
n = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
// write
if (pSmaInfo->size) {
int32_t size = pSmaInfo->size + sizeof(TSCKSUM);
if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue;
code = tRealloc(&pWriter->pBuf1, size);
if (code) goto _err;
tsdbCalcColDataSMA(pColData, &((SColumnDataAgg *)(*ppBuf))[n]);
n++;
}
taosCalcChecksumAppend(0, *ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
taosCalcChecksumAppend(0, pWriter->pBuf1, size);
// write
n = taosWriteFile(pFD, *ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
int64_t n = taosWriteFile(pWriter->pSmaFD, pWriter->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSmaInfo->offset = pWriter->fSma.size;
pWriter->fSma.size += size;
}
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
......@@ -2036,12 +2015,11 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) {
}
// ================= SMA ====================
_write_sma:
if (toLast) goto _exit;
if (pSmaInfo == NULL) goto _exit;
// TODO
// ================= SMA ====================
if (pSmaInfo) {
code = tsdbWriteBlockSma(pWriter, pBlockData, pSmaInfo);
if (code) goto _err;
}
_exit:
tFree(pBuf);
......
......@@ -58,7 +58,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
if (code) goto _err;
// SBlockIdx
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx, NULL);
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
if (code) goto _err;
pReader->iBlockIdx = 0;
......@@ -78,7 +78,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
pReader->iBlockIdx++;
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL);
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock);
if (code) goto _err;
pReader->iBlock = 0;
......@@ -552,7 +552,7 @@ _err:
static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) {
int32_t code = 0;
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL);
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock);
if (code) goto _err;
// SBlockData
......@@ -791,7 +791,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
}
if (pWriter->pBlockIdx) {
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock, NULL);
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock);
if (code) goto _err;
} else {
tMapDataReset(&pWriter->mBlock);
......@@ -843,7 +843,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
pWriter->iBlockIdx++;
}
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL);
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW);
if (code) goto _err;
code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
......@@ -897,7 +897,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
if (code) goto _err;
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx, NULL);
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx);
if (code) goto _err;
} else {
ASSERT(pWriter->pDataFReader == NULL);
......
......@@ -152,25 +152,6 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
return 0;
}
// TSDBKEY ======================================================
static FORCE_INLINE int32_t tPutTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
int32_t n = 0;
n += tPutI64v(p ? p + n : p, pKey->version);
n += tPutI64(p ? p + n : p, pKey->ts);
return n;
}
static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
int32_t n = 0;
n += tGetI64v(p + n, &pKey->version);
n += tGetI64(p + n, &pKey->ts);
return n;
}
// SBlockIdx ======================================================
int32_t tPutBlockIdx(uint8_t *p, void *ph) {
int32_t n = 0;
......@@ -225,9 +206,9 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
SBlock *pBlock = (SBlock *)ph;
n += tPutI64v(p ? p + n : p, pBlock->minKey.version);
n += tPutI64(p ? p + n : p, pBlock->minKey.ts);
n += tPutI64v(p ? p + n : p, pBlock->minKey.ts);
n += tPutI64v(p ? p + n : p, pBlock->maxKey.version);
n += tPutI64(p ? p + n : p, pBlock->maxKey.ts);
n += tPutI64v(p ? p + n : p, pBlock->maxKey.ts);
n += tPutI64v(p ? p + n : p, pBlock->minVer);
n += tPutI64v(p ? p + n : p, pBlock->maxVer);
n += tPutI32v(p ? p + n : p, pBlock->nRow);
......@@ -251,9 +232,9 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
SBlock *pBlock = (SBlock *)ph;
n += tGetI64v(p + n, &pBlock->minKey.version);
n += tGetI64(p + n, &pBlock->minKey.ts);
n += tGetI64v(p + n, &pBlock->minKey.ts);
n += tGetI64v(p + n, &pBlock->maxKey.version);
n += tGetI64(p + n, &pBlock->maxKey.ts);
n += tGetI64v(p + n, &pBlock->maxKey.ts);
n += tGetI64v(p + n, &pBlock->minVer);
n += tGetI64v(p + n, &pBlock->maxVer);
n += tGetI32v(p + n, &pBlock->nRow);
......@@ -1777,6 +1758,30 @@ int32_t tGetDiskDataHdr(uint8_t *p, void *ph) {
}
// ALGORITHM ==============================
int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) {
int32_t n = 0;
n += tPutI16v(p ? p + n : p, pColAgg->colId);
n += tPutI16v(p ? p + n : p, pColAgg->numOfNull);
n += tPutI64(p ? p + n : p, pColAgg->sum);
n += tPutI64(p ? p + n : p, pColAgg->max);
n += tPutI64(p ? p + n : p, pColAgg->min);
return n;
}
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) {
int32_t n = 0;
n += tGetI16v(p + n, &pColAgg->colId);
n += tGetI16v(p + n, &pColAgg->numOfNull);
n += tGetI64(p + n, &pColAgg->sum);
n += tGetI64(p + n, &pColAgg->max);
n += tGetI64(p + n, &pColAgg->min);
return n;
}
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal;
SColVal *pColVal = &colVal;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册