未验证 提交 bde64d07 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2472 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -371,9 +371,11 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { ...@@ -371,9 +371,11 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
if (keepData) { if (keepData) {
pRet->cols[i].len = pDataCols->cols[i].len; pRet->cols[i].len = pDataCols->cols[i].len;
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); if (pDataCols->cols[i].len > 0) {
if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints);
}
} }
} }
} }
...@@ -443,8 +445,10 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { ...@@ -443,8 +445,10 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap
for (int i = 0; i < rowsToMerge; i++) { for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) { for (int j = 0; j < source->numOfCols; j++) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows, if (source->cols[j].len > 0) {
target->maxPoints); dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
target->maxPoints);
}
} }
target->numOfRows++; target->numOfRows++;
} }
...@@ -479,8 +483,10 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi ...@@ -479,8 +483,10 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
if (key1 <= key2) { if (key1 <= key2) {
for (int i = 0; i < src1->numOfCols; i++) { for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type); ASSERT(target->cols[i].type == src1->cols[i].type);
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, if (src1->cols[i].len > 0) {
target->maxPoints); dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
}
} }
target->numOfRows++; target->numOfRows++;
...@@ -489,8 +495,10 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi ...@@ -489,8 +495,10 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi
} else { } else {
for (int i = 0; i < src2->numOfCols; i++) { for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type); ASSERT(target->cols[i].type == src2->cols[i].type);
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, if (src2->cols[i].len > 0) {
target->maxPoints); dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints);
}
} }
target->numOfRows++; target->numOfRows++;
......
...@@ -383,8 +383,8 @@ int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); ...@@ -383,8 +383,8 @@ int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target); int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SDataCols* pDataCols, int blkIdx, int16_t* colIds, int numOfColIds); int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, int16_t* colIds, int numOfColIds);
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SDataCols* target); int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock);
// ------------------ tsdbMain.c // ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId #define REPO_ID(r) (r)->config.tsdbId
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#include "tscompression.h" #include "tscompression.h"
#include "tsdbMain.h" #include "tsdbMain.h"
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock); SCompBlock *pCompBlock, bool isLast, bool isSuperBlock);
...@@ -42,17 +44,16 @@ static void tsdbResetHelperBlockImpl(SRWHelper *pHelper); ...@@ -42,17 +44,16 @@ static void tsdbResetHelperBlockImpl(SRWHelper *pHelper);
static void tsdbResetHelperBlock(SRWHelper *pHelper); static void tsdbResetHelperBlock(SRWHelper *pHelper);
static int tsdbInitHelperBlock(SRWHelper *pHelper); static int tsdbInitHelperBlock(SRWHelper *pHelper);
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type);
static int comparColIdCompCol(const void *arg1, const void *arg2);
static int comparColIdDataCol(const void *arg1, const void *arg2);
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf);
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
SDataCols *pDataCols);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize); int maxPoints, char *buffer, int bufferSize);
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds);
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols); static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx); static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx);
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx); static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);
static void tsdbDestroyHelperBlock(SRWHelper *pHelper); static void tsdbDestroyHelperBlock(SRWHelper *pHelper);
static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol,
SDataCol *pDataCol);
// ---------------------- INTERNAL FUNCTIONS ---------------------- // ---------------------- INTERNAL FUNCTIONS ----------------------
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
...@@ -162,8 +163,8 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -162,8 +163,8 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (pHelper->files.lastF.fd > 0) { if (pHelper->files.lastF.fd > 0) {
if (helperType(pHelper) == TSDB_WRITE_HELPER) { if (helperType(pHelper) == TSDB_WRITE_HELPER) {
fsync(pHelper->files.lastF.fd); fsync(pHelper->files.lastF.fd);
close(pHelper->files.lastF.fd);
} }
close(pHelper->files.lastF.fd);
pHelper->files.lastF.fd = -1; pHelper->files.lastF.fd = -1;
} }
if (helperType(pHelper) == TSDB_WRITE_HELPER) { if (helperType(pHelper) == TSDB_WRITE_HELPER) {
...@@ -315,7 +316,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ...@@ -315,7 +316,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
ASSERT(pCompBlock->last); ASSERT(pCompBlock->last);
if (pCompBlock->numOfSubBlocks > 1) { if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1)) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0) pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
...@@ -510,14 +511,34 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { ...@@ -510,14 +511,34 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(pCompBlock->numOfSubBlocks <= 1);
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1; if (lseek(pFile->fd, pCompBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); size_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols);
pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize); pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize);
if (pHelper->pCompData == NULL) return -1; if (pHelper->pCompData == NULL) {
if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (tread(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) {
tsdbError("vgId:%d failed to read %zu bytes from file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompData, tsize)) {
tsdbError("vgId:%d file %s is broken, offset %" PRId64 " size %zu", REPO_ID(pHelper->pRepo), pFile->fname,
(int64_t)pCompBlock->offset, tsize);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols); ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols);
...@@ -554,30 +575,31 @@ void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) ...@@ -554,30 +575,31 @@ void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols)
} }
} }
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) { int tsdbLoadBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds) {
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block
int numOfSubBlocks = pCompBlock->numOfSubBlocks; int numOfSubBlocks = pCompBlock->numOfSubBlocks;
SCompBlock *pStartBlock = if (numOfSubBlocks > 1) pCompBlock = (SCompBlock *)POINTER_SHIFT(pHelper->pCompInfo, pCompBlock->offset);
(numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset);
if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1; tdResetDataCols(pHelper->pDataCols[0]);
if (tsdbLoadBlockDataColsImpl(pHelper, pCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err;
for (int i = 1; i < numOfSubBlocks; i++) { for (int i = 1; i < numOfSubBlocks; i++) {
pStartBlock++; tdResetDataCols(pHelper->pDataCols[1]);
if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1; pCompBlock++;
tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows); if (tsdbLoadBlockDataColsImpl(pHelper, pCompBlock, pHelper->pDataCols[1], colIds, numOfColIds) < 0) goto _err;
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
} }
return 0; return 0;
_err:
return -1;
} }
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) { int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock) {
// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
int numOfSubBlock = pCompBlock->numOfSubBlocks; int numOfSubBlock = pCompBlock->numOfSubBlocks;
if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset); if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)POINTER_SHIFT(pHelper->pCompInfo, pCompBlock->offset);
tdResetDataCols(pHelper->pDataCols[0]); tdResetDataCols(pHelper->pDataCols[0]);
if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err; if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
...@@ -588,8 +610,6 @@ int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *tar ...@@ -588,8 +610,6 @@ int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *tar
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err; if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
} }
// if (target) TODO
return 0; return 0;
_err: _err:
...@@ -648,7 +668,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -648,7 +668,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
// Compress the data if neccessary // Compress the data if neccessary
int tcol = 0; int tcol = 0;
int32_t toffset = 0; int32_t toffset = 0;
int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM); int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull);
int32_t lsize = tsize; int32_t lsize = tsize;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
if (tcol >= nColsNotAllNull) break; if (tcol >= nColsNotAllNull) break;
...@@ -770,7 +790,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -770,7 +790,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} else { } else {
// Load // Load
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx)) < 0) goto _err;
ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows); ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows);
// Merge // Merge
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
...@@ -826,7 +846,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -826,7 +846,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} else { // Load-Merge-Write } else { // Load-Merge-Write
// Load // Load
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx)) < 0) goto _err;
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false; if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;
rowsWritten = rows3; rowsWritten = rows3;
...@@ -1183,52 +1203,13 @@ _err: ...@@ -1183,52 +1203,13 @@ _err:
return -1; return -1;
} }
static int comparColIdCompCol(const void *arg1, const void *arg2) {
return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId;
}
static int comparColIdDataCol(const void *arg1, const void *arg2) {
return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId;
}
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) {
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1;
if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1;
return 0;
}
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
SDataCols *pDataCols) {
if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1;
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
void *ptr = NULL;
for (int i = 0; i < numOfColIds; i++) {
int16_t colId = colIds[i];
ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol),
comparColIdCompCol);
if (ptr == NULL) continue;
SCompCol *pCompCol = (SCompCol *)ptr;
ptr =
bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol);
ASSERT(ptr != NULL);
SDataCol *pDataCol = (SDataCol *)ptr;
pDataCol->len = pCompCol->len;
if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1;
}
return 0;
}
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize) { int maxPoints, char *buffer, int bufferSize) {
// Verify by checksum // Verify by checksum
if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1; if (!taosCheckChecksumWhole((uint8_t *)content, len)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
// Decode the data // Decode the data
if (comp) { if (comp) {
...@@ -1249,10 +1230,97 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 ...@@ -1249,10 +1230,97 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
return 0; return 0;
} }
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol,
SDataCol *pDataCol) {
ASSERT(pDataCol->colId == pCompCol->colId);
int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompCol->len);
if (pHelper->pBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pHelper->compBuffer = trealloc(pHelper->compBuffer, tsize);
if (pHelper->compBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (lseek(pFile->fd, pCompCol->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tread(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm,
pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock,
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, pCompCol->colId,
(int64_t)pCompCol->offset);
return -1;
}
return 0;
}
static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) {
ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(pCompBlock->numOfSubBlocks <= 1);
SCompData *pCompData = (SCompData *)pHelper->pBuffer; SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err;
int dcol = 0;
int ccol = 0;
for (int i = 0; i < numOfColIds; i++) {
int16_t colId = colIds[i];
SDataCol *pDataCol = NULL;
SCompCol *pCompCol = NULL;
while (true) {
ASSERT(dcol < pDataCols->numOfCols);
pDataCol = &pDataCols->cols[dcol];
ASSERT(pDataCol->colId <= colId);
if (pDataCol->colId == colId) break;
dcol++;
}
ASSERT(pDataCol->colId == colId);
while (ccol < pCompBlock->numOfCols) {
pCompCol = &pHelper->pCompData->cols[ccol];
if (pCompCol->colId >= colId) break;
ccol++;
}
if (ccol >= pCompBlock->numOfCols || pCompCol->colId > colId) {
dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
dcol++;
continue;
}
ASSERT(pCompCol->colId == pDataCol->colId);
if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err;
dcol++;
ccol++;
}
return 0;
_err:
return -1;
}
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
ASSERT(pCompBlock->numOfSubBlocks <= 1);
SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
...@@ -1262,6 +1330,8 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ...@@ -1262,6 +1330,8 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
goto _err; goto _err;
} }
SCompData *pCompData = (SCompData *)pHelper->pBuffer;
int fd = pFile->fd; int fd = pFile->fd;
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) { if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
...@@ -1277,7 +1347,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ...@@ -1277,7 +1347,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
} }
ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);
int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); int32_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols);
if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) { if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) {
tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo), tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo),
pFile->fname, (int64_t)(pCompBlock->offset), pCompBlock->len); pFile->fname, (int64_t)(pCompBlock->offset), pCompBlock->len);
...@@ -1315,8 +1385,11 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ...@@ -1315,8 +1385,11 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
} }
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len, if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints, pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname,
pCompCol->colId, (int64_t)pCompCol->offset);
goto _err; goto _err;
}
dcol++; dcol++;
ccol++; ccol++;
} else if (pCompCol->colId < pDataCol->colId) { } else if (pCompCol->colId < pDataCol->colId) {
......
...@@ -599,7 +599,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -599,7 +599,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj)); tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) { if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock) == 0) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
......
...@@ -109,15 +109,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -109,15 +109,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRet->rsp = pRsp; pRet->rsp = pRsp;
// current connect is broken // current connect is broken
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle); if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) {
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo,
pReadMsg->rpcMsg.handle);
//NOTE: there two refcount, needs to kill twice, todo refactor pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(pQInfo, vnodeRelease, pVnode);
qKillQuery(pQInfo, vnodeRelease, pVnode); // NOTE: there two refcount, needs to kill twice, todo refactor
qKillQuery(pQInfo, vnodeRelease, pVnode);
return pRsp->code; qKillQuery(pQInfo, vnodeRelease, pVnode);
return pRsp->code;
}
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
} else {
assert(pQInfo == NULL);
vnodeRelease(pVnode);
} }
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册