提交 3436d523 编写于 作者: H Hongze Cheng

more refact

上级 e1881185
......@@ -265,14 +265,14 @@ int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *p
// SDelFWriter
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf, SDelIdx *pDelIdx);
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx, uint8_t **ppBuf);
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx);
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx);
int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter);
// SDelFReader
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf);
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, uint8_t **ppBuf);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap);
......@@ -583,6 +583,8 @@ struct SDelFWriter {
STsdb *pTsdb;
SDelFile fDel;
TdFilePtr pWriteH;
uint8_t *pBuf1;
};
struct SDiskData {
......
......@@ -317,7 +317,7 @@ static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelI
int32_t code = 0;
if (pDelIdx) {
code = tsdbReadDelData(pDelReader, pDelIdx, aDelData, NULL);
code = tsdbReadDelData(pDelReader, pDelIdx, aDelData);
}
return code;
......@@ -388,8 +388,7 @@ static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t
SDelIdx idx = {.suid = suid, .uid = uid};
// tMapDataReset(&delIdxMap);
// code = tsdbReadDelIdx(pDelFReader, &delIdxMap, NULL);
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray, NULL);
code = tsdbReadDelIdx(pDelFReader, pDelIdxArray);
if (code) goto _err;
// code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx);
......
......@@ -181,10 +181,10 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
SDelFile *pDelFileR = pCommitter->fs.pDelFile;
if (pDelFileR) {
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
if (code) goto _err;
code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx, NULL);
code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
if (code) goto _err;
}
......@@ -221,7 +221,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
suid = pDelIdx->suid;
uid = pDelIdx->uid;
code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
if (code) goto _err;
} else {
taosArrayClear(pCommitter->aDelData);
......@@ -241,7 +241,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
}
// write
code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, NULL, &delIdx);
code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
if (code) goto _err;
// put delIdx
......@@ -262,7 +262,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN, NULL);
code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
if (code) goto _err;
code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
......
......@@ -1618,7 +1618,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
goto _err;
}
code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL);
code = tsdbReadDelIdx(pDelFReader, aDelIdx);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(aDelIdx);
tsdbDelFReaderClose(&pDelFReader);
......@@ -1629,7 +1629,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);
if (pIdx != NULL) {
code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
}
taosArrayDestroy(aDelIdx);
......
......@@ -76,6 +76,9 @@ int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) {
goto _err;
}
tFree(pWriter->pBuf1);
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code;
......@@ -84,39 +87,34 @@ _err:
return code;
}
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf, SDelIdx *pDelIdx) {
int32_t code = 0;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pDelIdx->suid, .uid = pDelIdx->uid};
if (!ppBuf) ppBuf = &pBuf;
int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx) {
int32_t code = 0;
int64_t size;
int64_t n;
// prepare
size = sizeof(hdr);
size = sizeof(uint32_t);
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData));
}
size += sizeof(TSCKSUM);
// alloc
code = tRealloc(ppBuf, size);
code = tRealloc(&pWriter->pBuf1, size);
if (code) goto _err;
// build
n = 0;
*(SDiskDataHdr *)(*ppBuf) = hdr;
n += sizeof(hdr);
n += tPutU32(pWriter->pBuf1 + n, TSDB_FILE_DLMT);
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
n += tPutDelData(*ppBuf + n, taosArrayGet(aDelData, iDelData));
n += tPutDelData(pWriter->pBuf1 + n, taosArrayGet(aDelData, iDelData));
}
taosCalcChecksumAppend(0, *ppBuf, size);
taosCalcChecksumAppend(0, pWriter->pBuf1, size);
ASSERT(n + sizeof(TSCKSUM) == size);
// write
n = taosWriteFile(pWriter->pWriteH, *ppBuf, size);
n = taosWriteFile(pWriter->pWriteH, pWriter->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -129,48 +127,42 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf
pDelIdx->size = size;
pWriter->fDel.size += size;
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d, failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx, uint8_t **ppBuf) {
int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
int32_t code = 0;
int64_t size;
int64_t n;
uint8_t *pBuf = NULL;
SDelIdx *pDelIdx;
if (!ppBuf) ppBuf = &pBuf;
// prepare
size = 0;
size += tPutU32(NULL, TSDB_FILE_DLMT);
size = sizeof(uint32_t);
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx));
}
size += sizeof(TSCKSUM);
// alloc
code = tRealloc(ppBuf, size);
code = tRealloc(&pWriter->pBuf1, size);
if (code) goto _err;
// build
n = 0;
n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
n += tPutU32(pWriter->pBuf1 + n, TSDB_FILE_DLMT);
for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
n += tPutDelIdx(*ppBuf + n, taosArrayGet(aDelIdx, iDelIdx));
n += tPutDelIdx(pWriter->pBuf1 + n, taosArrayGet(aDelIdx, iDelIdx));
}
taosCalcChecksumAppend(0, *ppBuf, size);
taosCalcChecksumAppend(0, pWriter->pBuf1, size);
ASSERT(n + sizeof(TSCKSUM) == size);
// write
n = taosWriteFile(pWriter->pWriteH, *ppBuf, size);
n = taosWriteFile(pWriter->pWriteH, pWriter->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -180,12 +172,10 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx, uint8_t **ppBuf)
pWriter->fDel.offset = pWriter->fDel.size;
pWriter->fDel.size += size;
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d, write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
......@@ -225,9 +215,11 @@ struct SDelFReader {
STsdb *pTsdb;
SDelFile fDel;
TdFilePtr pReadH;
uint8_t *pBuf1;
};
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb, uint8_t **ppBuf) {
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
int32_t code = 0;
char fname[TSDB_FILENAME_LEN];
SDelFReader *pDelFReader;
......@@ -252,32 +244,6 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
goto _err;
}
#if 0
// load and check hdr if buffer is given
if (ppBuf) {
code = tRealloc(ppBuf, TSDB_FHDR_SIZE);
if (code) {
goto _err;
}
n = taosReadFile(pDelFReader->pReadH, *ppBuf, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < TSDB_FHDR_SIZE) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
if (!taosCheckChecksumWhole(*ppBuf, TSDB_FHDR_SIZE)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// TODO: check the content
}
#endif
_exit:
*ppReader = pDelFReader;
return code;
......@@ -297,6 +263,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
tFree(pReader->pBuf1);
taosMemoryFree(pReader);
}
*ppReader = NULL;
......@@ -305,16 +272,13 @@ _exit:
return code;
}
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pDelIdx->offset;
int64_t size = pDelIdx->size;
int64_t n;
uint8_t *pBuf = NULL;
SDiskDataHdr *pHdr;
SDelData *pDelData = &(SDelData){0};
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
int32_t code = 0;
int64_t offset = pDelIdx->offset;
int64_t size = pDelIdx->size;
int64_t n;
if (!ppBuf) ppBuf = &pBuf;
taosArrayClear(aDelData);
// seek
if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
......@@ -323,11 +287,11 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
}
// alloc
code = tRealloc(ppBuf, size);
code = tRealloc(&pReader->pBuf1, size);
if (code) goto _err;
// read
n = taosReadFile(pReader->pReadH, *ppBuf, size);
n = taosReadFile(pReader->pReadH, pReader->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -337,23 +301,21 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
}
// check
if (!taosCheckChecksumWhole(*ppBuf, size)) {
if (!taosCheckChecksumWhole(pReader->pBuf1, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// // decode
n = 0;
pHdr = (SDiskDataHdr *)(*ppBuf + n);
ASSERT(pHdr->delimiter == TSDB_FILE_DLMT);
ASSERT(pHdr->suid == pDelIdx->suid);
ASSERT(pHdr->uid == pDelIdx->uid);
n += sizeof(*pHdr);
taosArrayClear(aDelData);
uint32_t delimiter;
n += tGetU32(pReader->pBuf1 + n, &delimiter);
while (n < size - sizeof(TSCKSUM)) {
n += tGetDelData(*ppBuf + n, pDelData);
SDelData delData;
n += tGetDelData(pReader->pBuf1 + n, &delData);
if (taosArrayPush(aDelData, pDelData) == NULL) {
if (taosArrayPush(aDelData, &delData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -361,25 +323,20 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
ASSERT(n == size - sizeof(TSCKSUM));
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf) {
int32_t code = 0;
int32_t n;
int64_t offset = pReader->fDel.offset;
int64_t size = pReader->fDel.size - offset;
uint32_t delimiter;
uint8_t *pBuf = NULL;
SDelIdx *pDelIdx = &(SDelIdx){0};
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
int32_t code = 0;
int32_t n;
int64_t offset = pReader->fDel.offset;
int64_t size = pReader->fDel.size - offset;
if (!ppBuf) ppBuf = &pBuf;
taosArrayClear(aDelIdx);
// seek
if (taosLSeekFile(pReader->pReadH, offset, SEEK_SET) < 0) {
......@@ -388,11 +345,11 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf) {
}
// alloc
code = tRealloc(ppBuf, size);
code = tRealloc(&pReader->pBuf1, size);
if (code) goto _err;
// read
n = taosReadFile(pReader->pReadH, *ppBuf, size);
n = taosReadFile(pReader->pReadH, pReader->pBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -402,21 +359,23 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf) {
}
// check
if (!taosCheckChecksumWhole(*ppBuf, size)) {
if (!taosCheckChecksumWhole(pReader->pBuf1, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// decode
n = 0;
n += tGetU32(*ppBuf + n, &delimiter);
uint32_t delimiter;
n += tGetU32(pReader->pBuf1 + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
taosArrayClear(aDelIdx);
while (n < size - sizeof(TSCKSUM)) {
n += tGetDelIdx(*ppBuf + n, pDelIdx);
SDelIdx delIdx;
n += tGetDelIdx(pReader->pBuf1 + n, &delIdx);
if (taosArrayPush(aDelIdx, pDelIdx) == NULL) {
if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -424,12 +383,10 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf) {
ASSERT(n == size - sizeof(TSCKSUM));
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tFree(pBuf);
return code;
}
......
......@@ -177,7 +177,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
if (code) goto _err;
// read index
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx, NULL);
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx);
if (code) goto _err;
pReader->iDelIdx = 0;
......@@ -193,7 +193,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->iDelIdx++;
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData, NULL);
code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData);
if (code) goto _err;
int32_t size = 0;
......@@ -968,7 +968,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
if (code) goto _err;
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL);
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
if (code) goto _err;
}
......@@ -997,7 +997,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
if (c < 0) {
goto _new_del;
} else {
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
if (code) goto _err;
pWriter->iDelIdx++;
......@@ -1027,7 +1027,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
}
_write_del:
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
......@@ -1058,11 +1058,11 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
if (code) goto _err;
SDelIdx delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx);
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册