diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index cd1f4b1ca447f326d4dfd8432252829fee43d7d7..5610f0f75e7c7830f6299c58accb8b7e05306763 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -371,9 +371,11 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { if (keepData) { pRet->cols[i].len = pDataCols->cols[i].len; - memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); - if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { - memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); + if (pDataCols->cols[i].len > 0) { + memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); + if (pRet->cols[i].type == TSDB_DATA_TYPE_BINARY || pRet->cols[i].type == TSDB_DATA_TYPE_NCHAR) { + memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); + } } } } @@ -443,8 +445,10 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap for (int i = 0; i < rowsToMerge; i++) { for (int j = 0; j < source->numOfCols; j++) { - dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows, - target->maxPoints); + if (source->cols[j].len > 0) { + dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows, + target->maxPoints); + } } target->numOfRows++; } @@ -479,8 +483,10 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi if (key1 <= key2) { for (int i = 0; i < src1->numOfCols; i++) { ASSERT(target->cols[i].type == src1->cols[i].type); - dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, - target->maxPoints); + if (src1->cols[i].len > 0) { + dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, + target->maxPoints); + } } target->numOfRows++; @@ -489,8 +495,10 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limi } else { for (int i = 0; i < src2->numOfCols; i++) { ASSERT(target->cols[i].type == src2->cols[i].type); - dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, - target->maxPoints); + if (src2->cols[i].len > 0) { + dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, + target->maxPoints); + } } target->numOfRows++; diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index d80df4c9c37a45ef85ab0e10d57020f957da17a5..3ae88c3141eac390b5e8b65a7bdc8afc0c167a41 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -383,8 +383,8 @@ int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target); void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); -int tsdbLoadBlockDataCols(SRWHelper* pHelper, SDataCols* pDataCols, int blkIdx, int16_t* colIds, int numOfColIds); -int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SDataCols* target); +int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, int16_t* colIds, int numOfColIds); +int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock); // ------------------ tsdbMain.c #define REPO_ID(r) (r)->config.tsdbId diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 70bd9880f7971cb55516467adf4a2830b18a8b86..2338df1f97c8b218f89ee7622f3777af4377f852 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -20,6 +20,8 @@ #include "tscompression.h" #include "tsdbMain.h" +#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM)) + static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, bool isLast, bool isSuperBlock); @@ -42,17 +44,16 @@ static void tsdbResetHelperBlockImpl(SRWHelper *pHelper); static void tsdbResetHelperBlock(SRWHelper *pHelper); static int tsdbInitHelperBlock(SRWHelper *pHelper); static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); -static int comparColIdCompCol(const void *arg1, const void *arg2); -static int comparColIdDataCol(const void *arg1, const void *arg2); -static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf); -static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds, - SDataCols *pDataCols); static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, int maxPoints, char *buffer, int bufferSize); +static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, + int numOfColIds); static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols); static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx); static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx); static void tsdbDestroyHelperBlock(SRWHelper *pHelper); +static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol, + SDataCol *pDataCol); // ---------------------- INTERNAL FUNCTIONS ---------------------- int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { @@ -162,8 +163,8 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { if (pHelper->files.lastF.fd > 0) { if (helperType(pHelper) == TSDB_WRITE_HELPER) { fsync(pHelper->files.lastF.fd); - close(pHelper->files.lastF.fd); } + close(pHelper->files.lastF.fd); pHelper->files.lastF.fd = -1; } if (helperType(pHelper) == TSDB_WRITE_HELPER) { @@ -315,7 +316,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ASSERT(pCompBlock->last); if (pCompBlock->numOfSubBlocks > 1) { - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1; + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1)) < 0) return -1; ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0) @@ -510,14 +511,34 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; + SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); - if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1; + if (lseek(pFile->fd, pCompBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } - size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); + size_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols); pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize); - if (pHelper->pCompData == NULL) return -1; - if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1; + if (pHelper->pCompData == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (tread(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) { + tsdbError("vgId:%d failed to read %zu bytes from file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompData, tsize)) { + tsdbError("vgId:%d file %s is broken, offset %" PRId64 " size %zu", REPO_ID(pHelper->pRepo), pFile->fname, + (int64_t)pCompBlock->offset, tsize); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols); @@ -554,30 +575,31 @@ void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) } } -int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds) { - SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; - +int tsdbLoadBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds) { ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block - int numOfSubBlocks = pCompBlock->numOfSubBlocks; - SCompBlock *pStartBlock = - (numOfSubBlocks == 1) ? pCompBlock : (SCompBlock *)((char *)pHelper->pCompInfo->blocks + pCompBlock->offset); + int numOfSubBlocks = pCompBlock->numOfSubBlocks; + if (numOfSubBlocks > 1) pCompBlock = (SCompBlock *)POINTER_SHIFT(pHelper->pCompInfo, pCompBlock->offset); - if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pDataCols) < 0) return -1; + tdResetDataCols(pHelper->pDataCols[0]); + if (tsdbLoadBlockDataColsImpl(pHelper, pCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err; for (int i = 1; i < numOfSubBlocks; i++) { - pStartBlock++; - if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1; - tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows); + tdResetDataCols(pHelper->pDataCols[1]); + pCompBlock++; + if (tsdbLoadBlockDataColsImpl(pHelper, pCompBlock, pHelper->pDataCols[1], colIds, numOfColIds) < 0) goto _err; + if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err; } return 0; + +_err: + return -1; } -int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) { - // SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx; +int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock) { int numOfSubBlock = pCompBlock->numOfSubBlocks; - if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset); + if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)POINTER_SHIFT(pHelper->pCompInfo, pCompBlock->offset); tdResetDataCols(pHelper->pDataCols[0]); if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err; @@ -588,8 +610,6 @@ int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *tar if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err; } - // if (target) TODO - return 0; _err: @@ -648,7 +668,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa // Compress the data if neccessary int tcol = 0; int32_t toffset = 0; - int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull + sizeof(TSCKSUM); + int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull); int32_t lsize = tsize; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { if (tcol >= nColsNotAllNull) break; @@ -770,7 +790,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } else { // Load - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx)) < 0) goto _err; ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows); // Merge if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; @@ -826,7 +846,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } else { // Load-Merge-Write // Load - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx)) < 0) goto _err; if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false; rowsWritten = rows3; @@ -1183,52 +1203,13 @@ _err: return -1; } -static int comparColIdCompCol(const void *arg1, const void *arg2) { - return (*(int16_t *)arg1) - ((SCompCol *)arg2)->colId; -} - -static int comparColIdDataCol(const void *arg1, const void *arg2) { - return (*(int16_t *)arg1) - ((SDataCol *)arg2)->colId; -} - -static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf) { - size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols; - if (lseek(fd, pCompBlock->offset + tsize + pCompCol->offset, SEEK_SET) < 0) return -1; - if (tread(fd, buf, pCompCol->len) < pCompCol->len) return -1; - - return 0; -} - -static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds, - SDataCols *pDataCols) { - if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) return -1; - int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; - - void *ptr = NULL; - for (int i = 0; i < numOfColIds; i++) { - int16_t colId = colIds[i]; - - ptr = bsearch((void *)&colId, (void *)pHelper->pCompData->cols, pHelper->pCompData->numOfCols, sizeof(SCompCol), - comparColIdCompCol); - if (ptr == NULL) continue; - SCompCol *pCompCol = (SCompCol *)ptr; - - ptr = - bsearch((void *)&colId, (void *)(pDataCols->cols), pDataCols->numOfCols, sizeof(SDataCol), comparColIdDataCol); - ASSERT(ptr != NULL); - SDataCol *pDataCol = (SDataCol *)ptr; - - pDataCol->len = pCompCol->len; - if (tsdbLoadSingleColumnData(fd, pCompBlock, pCompCol, pDataCol->pData) < 0) return -1; - } - - return 0; -} - static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, int maxPoints, char *buffer, int bufferSize) { // Verify by checksum - if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1; + if (!taosCheckChecksumWhole((uint8_t *)content, len)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } // Decode the data if (comp) { @@ -1249,10 +1230,97 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 return 0; } -static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { +static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBlock, SCompCol *pCompCol, + SDataCol *pDataCol) { + ASSERT(pDataCol->colId == pCompCol->colId); + int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES; + pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompCol->len); + if (pHelper->pBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pHelper->compBuffer = trealloc(pHelper->compBuffer, tsize); + if (pHelper->compBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (lseek(pFile->fd, pCompCol->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (tread(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) { + tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm, + pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock, + pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) { + tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, pCompCol->colId, + (int64_t)pCompCol->offset); + return -1; + } + + return 0; +} + +static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) { ASSERT(pCompBlock->numOfSubBlocks <= 1); - SCompData *pCompData = (SCompData *)pHelper->pBuffer; + SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); + + if (tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err; + + int dcol = 0; + int ccol = 0; + for (int i = 0; i < numOfColIds; i++) { + int16_t colId = colIds[i]; + SDataCol *pDataCol = NULL; + SCompCol *pCompCol = NULL; + + while (true) { + ASSERT(dcol < pDataCols->numOfCols); + pDataCol = &pDataCols->cols[dcol]; + ASSERT(pDataCol->colId <= colId); + if (pDataCol->colId == colId) break; + dcol++; + } + + ASSERT(pDataCol->colId == colId); + + while (ccol < pCompBlock->numOfCols) { + pCompCol = &pHelper->pCompData->cols[ccol]; + if (pCompCol->colId >= colId) break; + ccol++; + } + + if (ccol >= pCompBlock->numOfCols || pCompCol->colId > colId) { + dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); + dcol++; + continue; + } + + ASSERT(pCompCol->colId == pDataCol->colId); + + if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err; + dcol++; + ccol++; + } + + return 0; + +_err: + return -1; +} + +static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { + ASSERT(pCompBlock->numOfSubBlocks <= 1); SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); @@ -1262,6 +1330,8 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa goto _err; } + SCompData *pCompData = (SCompData *)pHelper->pBuffer; + int fd = pFile->fd; if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, @@ -1277,7 +1347,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa } ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); - int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); + int32_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols); if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) { tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo), pFile->fname, (int64_t)(pCompBlock->offset), pCompBlock->len); @@ -1315,8 +1385,11 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa } if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len, pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints, - pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) + pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) { + tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, + pCompCol->colId, (int64_t)pCompCol->offset); goto _err; + } dcol++; ccol++; } else if (pCompCol->colId < pDataCol->colId) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 6dd17b16267a472aa8ee6d9f4c50aa01c246006b..716a2b0feebd101445a5d9ece854c1347cc3a3ef 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -599,7 +599,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj)); - if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) { + if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock) == 0) { SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 32bbfb16058ace68ddea3335e6cbfc4c6ac6fb60..894d214d855ca134768338abc7dc2de8cdbb872e 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -109,15 +109,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRet->rsp = pRsp; // current connect is broken - if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle); - pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - - //NOTE: there two refcount, needs to kill twice, todo refactor - qKillQuery(pQInfo, vnodeRelease, pVnode); - qKillQuery(pQInfo, vnodeRelease, pVnode); - - return pRsp->code; + if (code == TSDB_CODE_SUCCESS) { + if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo, + pReadMsg->rpcMsg.handle); + pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + + // NOTE: there two refcount, needs to kill twice, todo refactor + qKillQuery(pQInfo, vnodeRelease, pVnode); + qKillQuery(pQInfo, vnodeRelease, pVnode); + + return pRsp->code; + } + + vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); + } else { + assert(pQInfo == NULL); + vnodeRelease(pVnode); } vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);