diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index b2b68bfe7f84dc4af71d647716c2be7d612d0f12..9dbc1239c59166d2206d25d781cd8db0e15928f7 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -14,15 +14,18 @@ */ #ifndef _TD_TSDB_MAIN_H_ #define _TD_TSDB_MAIN_H_ +#include +#include +#include -#include "os.h" #include "hash.h" +#include "os.h" #include "tcoding.h" #include "tglobal.h" #include "tkvstore.h" #include "tlist.h" -#include "tlog.h" #include "tlockfree.h" +#include "tlog.h" #include "tsdb.h" #include "tskiplist.h" #include "tutil.h" @@ -33,7 +36,9 @@ extern "C" { extern int tsdbDebugFlag; -#define tsdbFatal(...) { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} +#define tsdbFatal(...) \ + { \ + if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} #define tsdbError(...) { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} #define tsdbWarn(...) { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} #define tsdbInfo(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} @@ -467,7 +472,7 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS #define IS_REPO_LOCKED(r) (r)->repoLocked #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) -int tsdbGetFileName(char* rootDir, int type, int vid, int fid, int seq, char** fname); +int tsdbGetFileName(char* rootDir, int type, int vid, int fid, int seq, char* fname); int tsdbLockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo); char* tsdbGetDataDirName(char* rootDir); @@ -502,7 +507,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle* pReadH, SFileGroup* pFGroup); void tsdbCloseAndUnsetReadFile(SReadHandle* pReadH); int tsdbLoadBlockIdx(SReadHandle* pReadH); int tsdbSetReadTable(SReadHandle* pReadH, STable* pTable); -int tsdbLoadBlockInfo(SReadHandle* pReadH); +int tsdbLoadBlockInfo(SReadHandle* pReadH, void* pMem); int tsdbLoadBlockData(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBlockInfo); int tsdbLoadBlockDataCols(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBlockInfo, int16_t* colIds, int numOfCols); int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock); @@ -532,6 +537,19 @@ int tsdbEncodeBlockIdx(void** buf, SBlockIdx* pBlockIdx); void* tsdbDecodeBlockIdx(void* buf, SBlockIdx* pBlockIdx); int tsdbLoadKeyCol(SReadHandle* pReadH, SBlockInfo* pBlockInfo, SBlock* pBlock); int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days); +#define TSDB_DATA_DIR_NAME "data" +void *tsdbCommitData(void *arg); +void tsdbGetDataStatis(SReadHandle *pReadH, SDataStatis *pStatis, int numOfCols); + +static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { + if (*(TSKEY*)key1 > *(TSKEY*)key2) { + return 1; + } else if (*(TSKEY*)key1 == *(TSKEY*)key2) { + return 0; + } else { + return -1; + } +} #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index b2deb38721adea50e978074f1790d81e4d94986a..24ac32d9a10e5ee2944aa6cf4d3502f6af6efca8 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -85,7 +86,7 @@ static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCom static void tsdbSeekTSCommitHandle(STSCommitHandle *pTSCh, TSKEY key); static int tsdbEncodeSFileGroup(void **buf, SFileGroup *pFGroup); static void * tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup); -static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pOldGroup, SFileGroup *pNewGroup); +static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileGroup *pNewGroup); static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid); static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock); static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup); @@ -111,7 +112,8 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock); static void tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey); static int tsdbInsertSubBlock(STSCommitHandle *pTSCh, SBlock *pBlock); -int tsdbCommitData(STsdbRepo *pRepo) { +void *tsdbCommitData(void *arg) { + STsdbRepo *pRepo = (STsdbRepo *)arg; ASSERT(pRepo->commit == 1 && pRepo->imem != NULL); SCommitHandle commitHandle = {0}; @@ -119,20 +121,20 @@ int tsdbCommitData(STsdbRepo *pRepo) { pCommitH->pRepo = pRepo; - if (tsdbStartCommit(pCommitH) < 0) return -1; + if (tsdbStartCommit(pCommitH) < 0) return NULL; if (tsdbCommitTimeSeriesData(pCommitH) < 0) { tsdbEndCommit(pCommitH, true); - return -1; + return NULL; } if (tsdbCommitMetaData(pCommitH) < 0) { tsdbEndCommit(pCommitH, true); - return -1; + return NULL; } tsdbEndCommit(pCommitH, false); - return 0; + return NULL; } static int tsdbStartCommit(SCommitHandle *pCommitH) { @@ -151,7 +153,7 @@ static int tsdbStartCommit(SCommitHandle *pCommitH) { pCommitH->fd = -1; - tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, &(pCommitH->fname)); + tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, pCommitH->fname); pCommitH->fd = open(pCommitH->fname, O_CREAT | O_WRONLY | O_APPEND, 0755); if (pCommitH->fd < 0) { tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pCommitH->fname, strerror(errno)); @@ -538,7 +540,7 @@ static int tsdbLogTSFileChange(SCommitHandle *pCommitH, int fid) { pDataFileChange->ofgroup = *pFGroup; } - tsdbGetNextCommitFileGroup(&(pDataFileChange->ofgroup), &(pDataFileChange->nfgroup)); + tsdbGetNextCommitFileGroup(pRepo, &(pDataFileChange->ofgroup), &(pDataFileChange->nfgroup)); if (tsdbLogFileChange(pCommitH, pChange) < 0) { free(pNode); @@ -560,7 +562,7 @@ static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) { return -1; } - STsdbFileChange *pChange = pNode->data; + STsdbFileChange *pChange = (STsdbFileChange *)pNode->data; pChange->type = TSDB_META_FILE_CHANGE; SMetaFileChange *pMetaChange = (SMetaFileChange *)(pChange->change); @@ -600,7 +602,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) { free(pNode); return -1; } - tdListAppendNode(pCommitH->pModLog, &pChange); + tdListAppendNode(pCommitH->pModLog, pNode); } else { break; } @@ -653,20 +655,21 @@ static int tsdbEncodeSFileGroup(void **buf, SFileGroup *pFGroup) { return tsize; } -static void *tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup) { +static UNUSED_FUNC void *tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup) { buf = taosDecodeVariantI32(buf, &(pFGroup->fileId)); for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { SFile *pFile = &(pFGroup->files[type]); + char *fname = pFile->fname; - buf = taosDecodeString(buf, &(pFile->fname)); + buf = taosDecodeString(buf, &fname); buf = tsdbDecodeSFileInfo(buf, &(pFile->info)); } return buf; } -static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pOldGroup, SFileGroup *pNewGroup) { +static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileGroup *pNewGroup) { pNewGroup->fileId = pOldGroup->fileId; for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { @@ -675,9 +678,9 @@ static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pO size_t len =strlen(pOldFile->fname); if (len == 0 || pOldFile->fname[len - 1] == '1') { - tsdbGetFileName(pRepo->rootDir, type, vid, pOldGroup->fileId, 0, pNewFile->fname); + tsdbGetFileName(pRepo->rootDir, type, REPO_ID(pRepo), pOldGroup->fileId, 0, pNewFile->fname); } else { - tsdbGetFileName(pRepo->rootDir, type, vid, pOldGroup->fileId, 1, pNewFile->fname); + tsdbGetFileName(pRepo->rootDir, type, REPO_ID(pRepo), pOldGroup->fileId, 1, pNewFile->fname); } } } @@ -685,7 +688,6 @@ static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pO static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { SCommitIter *pIter = pTSCh->pIters + tid; SReadHandle *pReadH = pTSCh->pReadH; - SDataCols * pDataCols = pTSCh->pDataCols; TSKEY keyNext = tsdbNextIterKey(pIter->pIter); taosRLockLatch(&(pIter->pTable->latch)); @@ -701,7 +703,7 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { return 0; } - if (tsdbLoadBlockInfo(pReadH) < 0) { + if (tsdbLoadBlockInfo(pReadH, NULL) < 0) { taosRUnLockLatch(&(pIter->pTable->latch)); return -1; } @@ -789,8 +791,8 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) tsdbCloseAndUnsetReadFile(pTSCh->pReadH); for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pOldFile = TSDB_FILE_IN_FGROUP(pOldGroup, type); - SFile *pNewFile = TSDB_FILE_IN_FGROUP(pNewGroup, type); + // SFile *pOldFile = TSDB_FILE_IN_FGROUP(pOldGroup, type); + SFile *pNewFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, type); if (pNewFile->fd >= 0) { if (!hasError) { @@ -805,16 +807,17 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) { ASSERT(pTSCh->nBlocks > 0); SReadHandle *pReadH = pTSCh->pReadH; + STsdbRepo * pRepo = pReadH->pRepo; SBlockInfo * pBlockInfo = pTSCh->pBlockInfo; SFile * pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD); - int tlen = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks, pTSCh->nSubBlocks); + int tlen = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks+pTSCh->nSubBlocks); pBlockInfo->delimiter = TSDB_FILE_DELIMITER; pBlockInfo->uid = TABLE_UID(pReadH->pTable); pBlockInfo->tid = TABLE_TID(pReadH->pTable); if (pTSCh->nSubBlocks > 0) { - if (tsdbAllocBuf(&(pTSCh->pBlockInfo), tlen) < 0) { + if (tsdbAllocBuf((void **)(&(pTSCh->pBlockInfo)), tlen) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -866,7 +869,7 @@ static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) { // label checksum len += sizeof(TSCKSUM); - if (tsdbAllocBuf(&(pReadH->pBuf), len) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -891,8 +894,8 @@ static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) { pFile->info.size += len; pFile->info.offset = (uint32_t)offset; pFile->info.len = len; - pFile->info.magic = taosCalcChecksum(pFile->info.magic, - (uint8_t *)POINTER_SHIFT(pReadH->pBuf, len - sizeof(TSCKSUM), sizeof(TSCKSUM))); + pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pReadH->pBuf, len - sizeof(TSCKSUM)), + sizeof(TSCKSUM)); ASSERT(pFile->info.size == pFile->info.offset + pFile->info.len); @@ -912,7 +915,6 @@ static int tsdbSetCommitTable(STSCommitHandle *pTSCh, STable *pTable) { static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) { SCommitIter *pIter = pTSCh->pIters + tid; SReadHandle *pReadH = pTSCh->pReadH; - SDataCols * pDataCols = pTSCh->pDataCols; SBlockIdx * pOldIdx = pReadH->pCurBlockIdx; TSKEY keyNext = tsdbNextIterKey(pIter->pIter); @@ -1025,7 +1027,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock); ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true); - if (tsdbAllocBuf(&(pReadH->pBlockData), csize) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBlockData)), csize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1042,7 +1044,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD nColsNotAllNull++; csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); - if (tsdbAllocBuf(&(pReadH->pBlockData), csize) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBlockData)), csize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1065,7 +1067,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD int32_t blen = olen + COMP_OVERFLOW_BYTES; // allocated buffer length int32_t clen = 0; - if (tsdbAllocBuf(&(pReadH->pBuf), coffset + blen + sizeof(TSCKSUM)) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), coffset + blen + sizeof(TSCKSUM)) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1074,7 +1076,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD if (pCfg->compression) { if (pCfg->compression == TWO_STAGE_COMP) { - if (tsdbAllocBuf(&(pReadH->pCBuf), blen) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pCBuf)), blen) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1084,7 +1086,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD blen, pCfg->compression, pReadH->pCBuf, blen); } else { clen = olen; - memcpy(pData, olen); + memcpy(pData, pDataCol->pData, olen); } ASSERT(clen > 0 && clen <= blen); @@ -1155,7 +1157,7 @@ static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) { for (int i = 0; i < pTSCh->nBlockIdx; i++) { int tlen = tsdbEncodeBlockIdx(NULL, pTSCh->pBlockIdx + i); - if (tsdbAllocBuf(&(pReadH->pBuf), tlen + len) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), tlen + len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1179,7 +1181,7 @@ static int tsdbUpdateFileGroupInfo(SFileGroup *pFileGroup) { } static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh) { - if (tsdbAllocBuf(&(pTSCh->pBlockIdx), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) { + if (tsdbAllocBuf((void **)(&(pTSCh->pBlockIdx)), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1245,7 +1247,7 @@ static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ASSERT(pBlock->numOfSubBlocks > 0); int tsize = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks + 1); - if (tsdbAllocBuf(&(pTSCh->pBlockInfo), tsize) < 0) { + if (tsdbAllocBuf((void **)(&(pTSCh->pBlockInfo)), tsize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1260,7 +1262,7 @@ static int tsdbAddSubBlocks(STSCommitHandle *pTSCh, SBlock *pBlocks, int nBlocks int tBlocks = pTSCh->nSubBlocks + nBlocks; int tsize = sizeof(SBlock) * tBlocks; - if (tsdbAllocBuf(&(pTSCh->pSubBlock), tsize) < 0) { + if (tsdbAllocBuf((void **)(&(pTSCh->pSubBlock)), tsize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1280,7 +1282,6 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); SBlock newBlock = {0}; - SFile * pFile = NULL; TSKEY keyNext = tsdbNextIterKey(pIter->pIter); if (keyNext > pBlock->keyLast) { // append merge last block @@ -1373,7 +1374,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { } // Commit data to keyLimit included - if (tsdbLoadKeyCol(pReadH, pBlock, NULL) < 0) return -1; + if (tsdbLoadKeyCol(pReadH, NULL, pBlock) < 0) return -1; rows = tsdbLoadDataFromCache(pIter->pTable, &titer, pBlock->keyLast, INT32_MAX, NULL, pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 6c9cae687d4e2aecc86b1c2baafd855365b6a981..4ce77c6d9c9ecef2deb112ad9d9ce56039f27d02 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -13,8 +13,11 @@ * along with this program. If not, see . */ #define _DEFAULT_SOURCE +#include #include +#include #include +#include #define TAOS_RANDOM_FILE_FAIL_TEST @@ -69,6 +72,7 @@ void tsdbFreeFileH(STsdbFileH *pFileH) { } } +// TODO: refactor this function int tsdbOpenFileH(STsdbRepo *pRepo) { ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL); @@ -125,7 +129,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { if (fid < mfid) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbGetFileName(pRepo->rootDir, pCfg->tsdbId, fid, type, fname); + tsdbGetFileName(pRepo->rootDir, type, pCfg->tsdbId, fid, 0, fname); (void)remove(fname); } continue; @@ -342,7 +346,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { memset((void *)pFile, 0, sizeof(SFile)); pFile->fd = -1; - tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname); + tsdbGetFileName(pRepo->rootDir, type, REPO_ID(pRepo), fid, type, pFile->fname); if (access(pFile->fname, F_OK) == 0) { tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), pFile->fname); @@ -525,7 +529,7 @@ int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) { static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { uint32_t version; - tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname); + tsdbGetFileName(pRepo->rootDir, type, REPO_ID(pRepo), fid, 0, pFile->fname); pFile->fd = -1; if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 040bde674483d680d86ad358fb4f2748ee072a4e..0974b00041cf53be2a757d9521391b87548d760c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -12,6 +12,10 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include +#include +#include +#include // no test file errors here #include "tsdbMain.h" @@ -212,7 +216,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ // STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; uint32_t magic = 0; - char * fname = NULL; + char fname[TSDB_FILENAME_LEN] = "\0"; struct stat fState; @@ -229,7 +233,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) { if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) { - tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, &fname); + tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, fname); *index = TSDB_META_FILE_INDEX; magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta); } else { @@ -239,11 +243,11 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ SFileGroup *pFGroup = taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE); if (pFGroup->fileId == fid) { - fname = strdup(pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname); + strncpy(fname, pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname, TSDB_FILENAME_LEN); magic = pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].info.magic; } else { if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < (int)eindex) { - fname = strdup(pFGroup->files[0].fname); + strncpy(fname, pFGroup->files[0].fname, TSDB_FILENAME_LEN); *index = pFGroup->fileId * TSDB_FILE_TYPE_MAX; magic = pFGroup->files[0].info.magic; } else { @@ -253,10 +257,8 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ } strcpy(name, fname + prefixLen); } else { // get the named file at the specified index. If not there, return 0 - fname = malloc(prefixLen + strlen(name) + 2); sprintf(fname, "%s/%s", prefix, name); if (access(fname, F_OK) != 0) { - taosFree(fname); taosFree(sdup); return 0; } @@ -265,20 +267,17 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ } else { tsdbGetFileInfoImpl(fname, &magic, size); } - taosFree(fname); taosFree(sdup); return magic; } if (stat(fname, &fState) < 0) { - taosTFree(fname); return 0; } *size = fState.st_size; // magic = *size; - taosTFree(fname); return magic; } @@ -517,15 +516,14 @@ static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg) { free(dirName); - char *fname = tsdbGetMetaFileName(rootDir); + char fname[TSDB_FILENAME_LEN] = "\0"; + tsdbGetFileName(rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, fname); if (fname == NULL) return -1; if (tdCreateKVStore(fname) < 0) { tsdbError("vgId:%d failed to open KV store since %s", pCfg->tsdbId, tstrerror(terrno)); - free(fname); return -1; } - free(fname); return 0; } @@ -541,7 +539,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) { char buf[TSDB_FILE_HEAD_SIZE] = "\0"; char *pBuf = buf; - tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, &fname); + tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, fname); fd = open(fname, O_WRONLY | O_CREAT, 0755); if (fd < 0) { @@ -581,7 +579,7 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) { int fd = -1; char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, &fname); + tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, fname); fd = open(fname, O_RDONLY); if (fd < 0) { @@ -769,31 +767,43 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) { SFileGroup *pFGroup = NULL; SFileGroupIter iter; - SRWHelper rhelper = {0}; - - if (tsdbInitReadHelper(&rhelper, pRepo) < 0) goto _err; + SReadHandle * pReadH = tsdbNewReadHandle(pRepo); + if (pReadH == NULL) return -1; tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC); while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) { if (pFGroup->state) continue; - if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err; - if (tsdbLoadCompIdx(&rhelper, NULL) < 0) goto _err; + if (tsdbSetAndOpenReadFGroup(pReadH, pFGroup) < 0) { + tsdbFreeReadHandle(pReadH); + return -1; + } + + if (tsdbLoadBlockIdx(pReadH) < 0) { + tsdbCloseAndUnsetReadFile(pReadH); + tsdbFreeReadHandle(pReadH); + return -1; + } + for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err; - SBlockIdx *pIdx = &(rhelper.curCompIdx); + if (tsdbSetReadTable(pReadH, pTable) < 0) { + tsdbCloseAndUnsetReadFile(pReadH); + tsdbFreeReadHandle(pReadH); + return -1; + } + + if (pReadH->pCurBlockIdx != NULL && pTable->lastKey < pReadH->pCurBlockIdx->maxKey) { + pTable->lastKey = pReadH->pCurBlockIdx->maxKey; - if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; + } } + + tsdbCloseAndUnsetReadFile(pReadH); } - tsdbDestroyHelper(&rhelper); + tsdbFreeReadHandle(pReadH); return 0; - -_err: - tsdbDestroyHelper(&rhelper); - return -1; } static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index ea3cef659da95ff5ae767ae978afdebe99314478..a87378e60579acb54db67475adbbe587bf4afbe2 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -12,6 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include +#include #include "tsdb.h" #include "tsdbMain.h" @@ -23,6 +25,7 @@ static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); +static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static char * tsdbGetTsTupleKey(const void *data); // ---------------- INTERNAL FUNCTIONS ---------------- diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index e52f5819348d9cf921bd7268f96ccac5d853f6bd..c832772d25cecb2adc850de7a77d767f2a02633e 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -468,11 +468,11 @@ void tsdbFreeMeta(STsdbMeta *pMeta) { } int tsdbOpenMeta(STsdbRepo *pRepo) { - char * fname = NULL; + char fname[TSDB_FILENAME_LEN] = "\0"; STsdbMeta *pMeta = pRepo->tsdbMeta; ASSERT(pMeta != NULL); - if (tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, &fname) < 0) goto _err; + if (tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, fname) < 0) goto _err; pMeta->pStore = tdOpenKVStore(fname, tsdbRestoreTable, tsdbOrgMeta, (void *)pRepo); if (pMeta->pStore == NULL) { @@ -481,11 +481,9 @@ int tsdbOpenMeta(STsdbRepo *pRepo) { } tsdbDebug("vgId:%d open TSDB meta succeed", REPO_ID(pRepo)); - taosTFree(fname); return 0; _err: - taosTFree(fname); return -1; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1ae666b2ca5ae7c05a92bc4e7d1d2f252b83463d..6ffd79e6cc11a574ce4bd32733abe1b10606d0d8 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include #include "os.h" #include "tulog.h" @@ -114,7 +115,7 @@ typedef struct STsdbQueryHandle { int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows SFileGroup* pFileGroup; SFileGroupIter fileIter; - SRWHelper rhelper; + SReadHandle* rhelper; STableBlockInfo* pDataBlockInfo; SDataCols *pDataCols; // in order to hold current file data block @@ -266,7 +267,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pQueryHandle->allocSize = 0; pQueryHandle->locateStart = false; - if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) { + if ((pQueryHandle->rhelper = tsdbNewReadHandle((STsdbRepo *)tsdb)) == NULL) { goto out_of_memory; } @@ -671,15 +672,15 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); pCheckInfo->numOfBlocks = 0; - if (tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb) != TSDB_CODE_SUCCESS) { + if (tsdbSetReadTable(pQueryHandle->rhelper, pCheckInfo->pTableObj) < 0) { code = terrno; break; } - SBlockIdx* compIndex = &pQueryHandle->rhelper.curCompIdx; + SBlockIdx* compIndex = pQueryHandle->rhelper->pCurBlockIdx; // no data block in this file, try next file - if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) { + if (compIndex == NULL || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) { continue; // no data blocks in the file belongs to pCheckInfo->pTable } @@ -697,7 +698,9 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo pCheckInfo->compSize = compIndex->len; } - tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo)); + if (tsdbLoadBlockInfo(pQueryHandle->rhelper, (void *)(pCheckInfo->pCompInfo)) < 0) { + // TODO: deal with the error here + } SBlockInfo* pCompInfo = pCheckInfo->pCompInfo; TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; @@ -736,7 +739,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo return code; } -static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { +static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { int64_t st = taosGetTimestampUs(); STSchema *pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); @@ -747,14 +750,14 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p goto _error; } - code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema); + code = tdInitDataCols(pQueryHandle->rhelper->pDataCols[0], pSchema); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %p", pQueryHandle, pQueryHandle->qinfo); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _error; } - code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); + code = tdInitDataCols(pQueryHandle->rhelper->pDataCols[1], pSchema); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %p", pQueryHandle, pQueryHandle->qinfo); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -763,7 +766,7 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p int16_t* colIds = pQueryHandle->defaultLoadColumn->pData; - int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle))); + int32_t ret = tsdbLoadBlockDataCols(pQueryHandle->rhelper, pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle))); if (ret != TSDB_CODE_SUCCESS) { int32_t c = terrno; assert(c != TSDB_CODE_SUCCESS); @@ -776,7 +779,7 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p pBlockLoadInfo->slot = pQueryHandle->cur.slot; pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid; - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0]; assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows); pBlock->numOfRows = pCols->numOfRows; @@ -896,7 +899,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, return code; } - SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pTSCol = pQueryHandle->rhelper->pDataCols[0]; assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows); if (pCheckInfo->lastKey > pBlock->keyFirst) { @@ -919,7 +922,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, return code; } - SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pTSCol = pQueryHandle->rhelper->pDataCols[0]; if (pCheckInfo->lastKey < pBlock->keyLast) { cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order); } else { @@ -1004,7 +1007,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity char* pData = NULL; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1; - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0]; TSKEY* tsArray = pCols->cols[0].pData; int32_t num = end - start + 1; @@ -1228,7 +1231,7 @@ static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) { static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) { SQueryFilePos* cur = &pQueryHandle->cur; - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0]; TSKEY* tsArray = pCols->cols[0].pData; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; @@ -1272,7 +1275,7 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC; SQueryFilePos* cur = &pQueryHandle->cur; - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0]; if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey >= pBlockInfo->window.ekey) { endPos = pBlockInfo->rows - 1; @@ -1297,7 +1300,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* initTableMemIterator(pQueryHandle, pCheckInfo); - SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; + SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0]; assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && cur->pos >= 0 && cur->pos < pBlock->numOfRows); @@ -1681,7 +1684,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex break; } - if (tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) { + if (tsdbSetAndOpenReadFGroup(pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) { pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); code = terrno; break; @@ -1689,7 +1692,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock); - if (tsdbLoadCompIdx(&pQueryHandle->rhelper, NULL) < 0) { + if (tsdbLoadBlockIdx(pQueryHandle->rhelper) < 0) { code = terrno; break; } @@ -2168,7 +2171,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta } int64_t stime = taosGetTimestampUs(); - tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); + if (tsdbLoadBlockDataInfo(pHandle->rhelper, pBlockInfo->compBlock) < 0) { + // TODO: deal with the error here + } int16_t* colIds = pHandle->defaultLoadColumn->pData; @@ -2178,7 +2183,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta pHandle->statis[i].colId = colIds[i]; } - tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols); + tsdbGetDataStatis(pHandle->rhelper, pHandle->statis, (int)numOfCols); // always load the first primary timestamp column data SDataStatis* pPrimaryColStatis = &pHandle->statis[0]; @@ -2703,7 +2708,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { // todo check error tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem); - tsdbDestroyHelper(&pQueryHandle->rhelper); + tsdbFreeReadHandle(pQueryHandle->rhelper); tdFreeDataCols(pQueryHandle->pDataCols); pQueryHandle->pDataCols = NULL; diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index dc9de4b01621fb96cda3fb6828bf3ee798a48474..ec3309eb13d7a9164bf9fdeec0ed10b2bb03564e 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -14,6 +14,7 @@ */ #include #include +#include #include #include @@ -31,8 +32,7 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH); static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx); static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, int maxPoints, char *buffer, int bsize); -static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, - SDataCol *pDataCol); +static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) { SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH)); @@ -61,6 +61,7 @@ SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) { void tsdbFreeReadHandle(SReadHandle *pReadH) { if (pReadH) { + tsdbCloseAndUnsetReadFile(pReadH); taosTZfree(pReadH->pBlockIdx); taosTZfree(pReadH->pBlockInfo); taosTZfree(pReadH->pBlockData); @@ -79,7 +80,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { pReadH->fGroup = *pFGroup; - tsdbResetFGroupFd(&(pReadH->fGroup)); + tsdbCloseAndUnsetReadFile(pReadH); for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), type); @@ -123,7 +124,7 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) { ASSERT(pFile->info.size == pFile->info.offset + pFile->info.len); - if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), pFile->info.len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -207,7 +208,7 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) { return 0; } -int tsdbLoadBlockInfo(SReadHandle *pReadH) { +int tsdbLoadBlockInfo(SReadHandle *pReadH, void *pMem) { ASSERT(pReadH != NULL); if (pReadH->pCurBlockIdx == NULL) return 0; @@ -218,7 +219,7 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { ASSERT(pFile->fd > 0 && pBlockIdx->len > 0); - if (tsdbAllocBuf(&((void *)(pReadH->pBlockInfo)), pBlockIdx->len) < 0) { + if (pMem == NULL && tsdbAllocBuf((void **)(&(pReadH->pBlockInfo)), pBlockIdx->len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -230,7 +231,9 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { return -1; } - ssize_t ret = taosTRead(pFile->fd, (void *)(pReadH->pBlockInfo), pBlockIdx->len); + if (pMem == NULL) pMem = (void *)(pReadH->pBlockInfo); + + ssize_t ret = taosTRead(pFile->fd, pMem, pBlockIdx->len); if (ret < 0) { tsdbError("vgId:%d failed to read block info part of table %s from file %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pReadH->pTable), pFile->fname, strerror(errno)); @@ -238,7 +241,7 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) { return -1; } - if (ret < pBlockIdx->len || tsdbVerifyBlockInfo(pReadH->pBlockInfo, pBlockIdx) < 0) { + if (ret < pBlockIdx->len || tsdbVerifyBlockInfo((SBlockInfo *)pMem, pBlockIdx) < 0) { tsdbError("vgId:%d table %s block info part is corrupted in file %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pReadH->pTable), pFile->fname); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; @@ -311,28 +314,28 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) { if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname, - pBlock->offset, strerror(errno)); + (int64_t)pBlock->offset, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols); - if (tsdbAllocBuf(&(pReadH->pBlockData), tsize) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBlockData)), tsize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } - int ret = taosTRead(pFile->fd, tsize); + int ret = taosTRead(pFile->fd, (void *)pReadH->pBlockData, tsize); if (ret < 0) { tsdbError("vgId:%d failed to read block data info part from file %s offset %" PRId64 " len %d since %s", - REPO_ID(pRepo), pFile->fname, pBlock->offset, tsize, strerror(errno)); + REPO_ID(pRepo), pFile->fname, (int64_t)pBlock->offset, tsize, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (ret < tsize || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBlockData), tsize)) { tsdbError("vgId:%d block data info part is corrupted in file %s offset %" PRId64 " len %d", REPO_ID(pRepo), - pFile->fname, pBlock->offset, tsize); + pFile->fname, (int64_t)pBlock->offset, tsize); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -349,6 +352,34 @@ int tsdbLoadKeyCol(SReadHandle *pReadH, SBlockInfo *pBlockInfo, SBlock *pBlock) return tsdbLoadBlockDataCols(pReadH, pBlock, pBlockInfo, &colId, 1); } +void tsdbGetDataStatis(SReadHandle *pReadH, SDataStatis *pStatis, int numOfCols) { + SBlockData *pBlockData = pReadH->pBlockData; + + for (int i = 0, j = 0; i < numOfCols;) { + if (j >= pBlockData->numOfCols) { + pStatis[i].numOfNull = -1; + i++; + continue; + } + + if (pStatis[i].colId == pBlockData->cols[j].colId) { + pStatis[i].sum = pBlockData->cols[j].sum; + pStatis[i].max = pBlockData->cols[j].max; + pStatis[i].min = pBlockData->cols[j].min; + pStatis[i].maxIndex = pBlockData->cols[j].maxIndex; + pStatis[i].minIndex = pBlockData->cols[j].minIndex; + pStatis[i].numOfNull = pBlockData->cols[j].numOfNull; + i++; + j++; + } else if (pStatis[i].colId < pBlockData->cols[j].colId) { + pStatis[i].numOfNull = -1; + i++; + } else { + j++; + } + } +} + static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols) { ASSERT(pBlock->numOfSubBlocks <= 1); @@ -360,14 +391,14 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA); } - if (tsdbAllocBuf(&(pReadH->pBuf), pBlock->len) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), pBlock->len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname, - pBlock->offset, strerror(errno)); + (int64_t)pBlock->offset, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -375,7 +406,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols int ret = taosTRead(pFile->fd, (void *)(pReadH->pBuf), pBlock->len); if (ret < 0) { tsdbError("vgId:%d failed to read block data part from file %s at offset %" PRId64 " len %d since %s", - REPO_ID(pRepo), pFile->fname, pBlock->offset, pBlock->len, strerror(errno)); + REPO_ID(pRepo), pFile->fname, (int64_t)pBlock->offset, pBlock->len, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -383,7 +414,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols); if (ret < pBlock->len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), tsize)) { tsdbError("vgId:%d block data part from file %s at offset %" PRId64 " len %d is corrupted", REPO_ID(pRepo), - pFile->fname, pBlock->offset, pBlock->len); + pFile->fname, (int64_t)pBlock->offset, pBlock->len); terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; } @@ -391,7 +422,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols SBlockData *pBlockData = (SBlockData *)pReadH->pBuf; ASSERT(pBlockData->delimiter == TSDB_FILE_DELIMITER); - ASSERT(pBlockData->numOfCols = pBlock->numOfCols); + ASSERT(pBlockData->numOfCols == pBlock->numOfCols); tdResetDataCols(pDataCols); ASSERT(pBlock->numOfRows <= pDataCols->maxPoints); @@ -427,7 +458,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols zsize += (sizeof(VarDataLenT) * pBlock->numOfRows); } - if (tsdbAllocBuf(&(pReadH->pCBuf), zsize) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pCBuf)), zsize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -540,12 +571,13 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC } static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { - void * pBuf = pReadH->pBuf; - SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD); + void * pBuf = pReadH->pBuf; + STsdbRepo *pRepo = pReadH->pRepo; + SFile * pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD); pReadH->nBlockIdx = 0; while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) { - if (tsdbAllocBuf(&((void *)(pReadH->pBlockIdx)), sizeof(SBlockIdx) * (pReadH->nBlockIdx + 1)) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBlockIdx)), sizeof(SBlockIdx) * (pReadH->nBlockIdx + 1)) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -559,7 +591,7 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { } pReadH->nBlockIdx++; - ASSERT(pReadH->nBlockIdx == 1 || (pReadH->pBlockIdx[pReadH->nBlockIdx-1].tid < (pReadH->pBlockIdx[pReadH->nBlockIdx-2].tid)); + ASSERT(pReadH->nBlockIdx == 1 || (pReadH->pBlockIdx[pReadH->nBlockIdx-1].tid < pReadH->pBlockIdx[pReadH->nBlockIdx-2].tid)); } ASSERT(pReadH->nBlockIdx > 0); @@ -616,7 +648,7 @@ static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SB STsdbRepo *pRepo = pReadH->pRepo; STsdbCfg * pCfg = &(pRepo->config); - if (tsdbAllocBuf(&(pReadH->pBuf), pBlockCol->len) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), pBlockCol->len) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -643,7 +675,7 @@ static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SB zsize += (sizeof(VarDataLenT) * pBlock->numOfRows); } - if (tsdbAllocBuf(&(pReadH->pCBuf), zsize) < 0) { + if (tsdbAllocBuf((void **)(&(pReadH->pCBuf)), zsize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } diff --git a/src/tsdb/src/tsdbUtil.c b/src/tsdb/src/tsdbUtil.c index 0aa5b4c1a1ab4462f3e6ff35644f89ad494e61fb..e18b9be570971fbc3b8cc7c2ae92702425d00127 100644 --- a/src/tsdb/src/tsdbUtil.c +++ b/src/tsdb/src/tsdbUtil.c @@ -17,37 +17,28 @@ #include "tsdbMain.h" -#define TSDB_DATA_DIR_NAME "data" const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".manifest", "meta", "config"}; -int tsdbGetFileName(char *rootDir, int type, int vid, int fid, int seq, char **fname) { - if (*fname == NULL) { - *fname = (char *)malloc(TSDB_FILENAME_LEN); - if (*fname == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - } - +int tsdbGetFileName(char *rootDir, int type, int vid, int fid, int seq, char *fname) { switch (type) { case TSDB_FILE_TYPE_HEAD: case TSDB_FILE_TYPE_DATA: case TSDB_FILE_TYPE_LAST: if (seq == 0) { // For backward compatibility - snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, + snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type]); } else { - snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s-%d", rootDir, TSDB_DATA_DIR_NAME, vid, fid, + snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s-%d", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type], seq); } break; case TSDB_FILE_TYPE_MANIFEST: - snprintf(*fname, TSDB_FILENAME_LEN, "%s/v%d%s", rootDir, vid, tsdbFileSuffix[type]); + snprintf(fname, TSDB_FILENAME_LEN, "%s/v%d%s", rootDir, vid, tsdbFileSuffix[type]); break; case TSDB_FILE_TYPE_META: case TSDB_FILE_TYPE_CFG: - snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s", rootDir, tsdbFileSuffix[type]); + snprintf(fname, TSDB_FILENAME_LEN, "%s/%s", rootDir, tsdbFileSuffix[type]); break; default: ASSERT(0);