diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 810067d7362b8ab9867f43b042d8dae2158702d8..3e7419f36e851a435f0adfe4841ef6d440a73eb5 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -278,7 +278,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); -void *tdFreeDataCols(SDataCols *pCols); +SDataCols *tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); diff --git a/src/tfs/src/tfs.c b/src/tfs/src/tfs.c index 563adc88db49f146437b4d320aa07df018f9caa7..45c9e606d20a170906d3b727544fe73910c9e7f9 100644 --- a/src/tfs/src/tfs.c +++ b/src/tfs/src/tfs.c @@ -169,7 +169,7 @@ void tfsAllocDisk(int expLevel, int *level, int *id) { while (*level >= 0) { *id = tfsAssignDisk(*level); if (*id < 0) { - *level--; + (*level)--; continue; } diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 766aa78850c66028421b168b8f038ad6bad4066f..0d05df38daaa6485c67331f19c549e4bbc813283 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -132,6 +132,7 @@ void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile); int tsdbEncodeSDFile(void** buf, SDFile* pDFile); void* tsdbDecodeSDFile(void* buf, SDFile* pDFile); +int tsdbUpdateDFileHeader(SDFile *pDFile); static FORCE_INLINE int tsdbOpenDFile(SDFile *pDFile, int flags) { ASSERT(!TSDB_FILE_OPENED(pDFile)); @@ -214,7 +215,7 @@ static FORCE_INLINE int tsdbCreateAndOpenDFile(SDFile* pDFile) { pDFile->info.size += TSDB_FILE_HEAD_SIZE; - if (tsdbUpdaeDFileHeader(pDFile) < 0) { + if (tsdbUpdateDFileHeader(pDFile) < 0) { tsdbCloseDFile(pDFile); remove(TSDB_FILE_FULL_NAME(pDFile)); return -1; diff --git a/src/tsdb/inc/tsdbKV.h b/src/tsdb/inc/tsdbKV.h new file mode 100644 index 0000000000000000000000000000000000000000..7bd270bed234399655980f895f67ef49d90daf37 --- /dev/null +++ b/src/tsdb/inc/tsdbKV.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_KV_H_ +#define _TD_TSDB_KV_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define KVSTORE_FILE_VERSION ((uint32_t)0) + +typedef int (*iterFunc)(void*, void* cont, int contLen); +typedef void (*afterFunc)(void*); + +typedef struct { + SMFile f; + SHashObj* map; + iterFunc iFunc; + afterFunc aFunc; + void* appH; +} SKVStore; + +#define KVSTORE_MAGIC(s) (s)->f.info.magic + +int tdCreateKVStore(char* fname); +int tdDestroyKVStore(char* fname); +SKVStore* tdOpenKVStore(char* fname, iterFunc iFunc, afterFunc aFunc, void* appH); +void tdCloseKVStore(SKVStore* pStore); +int tdKVStoreStartCommit(SKVStore* pStore); +int tdUpdateKVStoreRecord(SKVStore* pStore, uint64_t uid, void* cont, int contLen); +int tdDropKVStoreRecord(SKVStore* pStore, uint64_t uid); +int tdKVStoreEndCommit(SKVStore* pStore); +void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TSDB_KV_H_*/ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h deleted file mode 100644 index c8ac9778843acf669375c83aa71de04f6d10109a..0000000000000000000000000000000000000000 --- a/src/tsdb/inc/tsdbMain.h +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#ifndef _TD_TSDB_MAIN_H_ -#define _TD_TSDB_MAIN_H_ - -#include "os.h" -#include "hash.h" -#include "tcoding.h" -#include "tglobal.h" -#include "tkvstore.h" -#include "tlist.h" -#include "tlog.h" -#include "tlockfree.h" -#include "tsdb.h" -#include "tskiplist.h" -#include "tutil.h" -#include "tchecksum.h" -#include "tfs.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct STsdbRepo STsdbRepo; - -// ================= tsdbLog.h - -// ================= OTHERS - -#define TAOS_IN_RANGE(key, keyMin, keyLast) (((key) >= (keyMin)) && ((key) <= (keyMax))) - -// NOTE: Any file format change must increase this version number by 1 -// Also, implement the convert function -#define TSDB_FILE_VERSION ((uint32_t)0) - -// Definitions -// ================= tsdbMeta.c - -// ================= tsdbBuffer.c - -// ------------------ tsdbMemTable.c -// ================= tsdbFile.c -/* Statistic information of the TSDB file system. - */ -// ================= tsdbStore.c -#define KVSTORE_FILE_VERSION ((uint32_t)0) - -typedef int (*iterFunc)(void*, void* cont, int contLen); -typedef void (*afterFunc)(void*); - -typedef struct { - SMFile f; - SHashObj* map; - iterFunc iFunc; - afterFunc aFunc; - void* appH; -} SKVStore; - -#define KVSTORE_MAGIC(s) (s)->f.info.magic - -int tdCreateKVStore(char* fname); -int tdDestroyKVStore(char* fname); -SKVStore* tdOpenKVStore(char* fname, iterFunc iFunc, afterFunc aFunc, void* appH); -void tdCloseKVStore(SKVStore* pStore); -int tdKVStoreStartCommit(SKVStore* pStore); -int tdUpdateKVStoreRecord(SKVStore* pStore, uint64_t uid, void* cont, int contLen); -int tdDropKVStoreRecord(SKVStore* pStore, uint64_t uid); -int tdKVStoreEndCommit(SKVStore* pStore); -void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size); - -// ================= -// extern const char* tsdbFileSuffix[]; - -// minFid <= midFid <= maxFid -// typedef struct { -// int minFid; // >= minFid && < midFid, at level 2 -// int midFid; // >= midFid && < maxFid, at level 1 -// int maxFid; // >= maxFid, at level 0 -// } SFidGroup; - -// typedef enum { -// TSDB_FILE_TYPE_HEAD = 0, -// TSDB_FILE_TYPE_DATA, -// TSDB_FILE_TYPE_LAST, -// TSDB_FILE_TYPE_STAT, -// TSDB_FILE_TYPE_NHEAD, -// TSDB_FILE_TYPE_NDATA, -// TSDB_FILE_TYPE_NLAST, -// TSDB_FILE_TYPE_NSTAT -// } TSDB_FILE_TYPE; - -// #ifndef TDINTERNAL -// #define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1) -// #else -// #define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1) -// #endif - -// typedef struct { -// uint32_t magic; -// uint32_t len; -// uint32_t totalBlocks; -// uint32_t totalSubBlocks; -// uint32_t offset; -// uint64_t size; // total size of the file -// uint64_t tombSize; // unused file size -// } STsdbFileInfo; - -// typedef struct { -// TFILE file; -// STsdbFileInfo info; -// int fd; -// } SFile; - -// typedef struct { -// int fileId; -// int state; // 0 for health, 1 for problem -// SFile files[TSDB_FILE_TYPE_MAX]; -// } SFileGroup; - -// typedef struct { -// pthread_rwlock_t fhlock; - -// int maxFGroups; -// int nFGroups; -// SFileGroup* pFGroup; -// } STsdbFileH; - -// typedef struct { -// int direction; -// STsdbFileH* pFileH; -// int fileId; -// int index; -// } SFileGroupIter; - -// #define TSDB_FILE_NAME(pFile) ((pFile)->file.aname) -#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) -// #define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) -// #define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId -// #define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId -// #define TSDB_IS_FILE_OPENED(f) ((f)->fd > 0) -// #define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC -// #define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC - -// STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); -// void tsdbFreeFileH(STsdbFileH* pFileH); -// int tsdbOpenFileH(STsdbRepo* pRepo); -// void tsdbCloseFileH(STsdbRepo* pRepo, bool isRestart); -// SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level); -// void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); -// void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); -// SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); -// int tsdbOpenFile(SFile* pFile, int oflag); -// void tsdbCloseFile(SFile* pFile); -// int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type); -// SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags); -// int tsdbGetFidLevel(int fid, SFidGroup fidg); -// void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup); -// int tsdbUpdateFileHeader(SFile* pFile); -// int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); -// void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); -// void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); -// int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); -// void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); -// void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup); -// int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); - -// ================= tsdbMain.c - - -#include "tsdbReadImpl.h" - -#if 0 -// ================= tsdbRWHelper.c - -typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; - -typedef struct { - TSKEY minKey; - TSKEY maxKey; - SDFileSet rSet; - SDFileSet wSet; -} SHelperFile; - -typedef struct { - uint64_t uid; - int32_t tid; -} SHelperTable; - -typedef struct { - SBlockIdx* pIdxArray; - int numOfIdx; - int curIdx; -} SIdxH; - -typedef struct { - tsdb_rw_helper_t type; - - STsdbRepo* pRepo; - int8_t state; - // For file set usage - SHelperFile files; - SIdxH idxH; - SBlockIdx curCompIdx; - void* pWIdx; - // For table set usage - SHelperTable tableInfo; - SBlockInfo* pCompInfo; - bool hasOldLastBlock; - // For block set usage - SBlockData* pCompData; - SDataCols* pDataCols[2]; - void* pBuffer; // Buffer to hold the whole data block - void* compBuffer; // Buffer for temperary compress/decompress purpose -} SRWHelper; - -#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state -#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set -#define TSDB_HELPER_IDX_LOAD 0x2 // SBlockIdx part is loaded -#define TSDB_HELPER_TABLE_SET 0x4 // Table is set -#define TSDB_HELPER_INFO_LOAD 0x8 // SBlockInfo part is loaded -#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SBlockData part is loaded -#define helperSetState(h, s) (((h)->state) |= (s)) -#define helperClearState(h, s) ((h)->state &= (~(s))) -#define helperHasState(h, s) ((((h)->state) & (s)) == (s)) -#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx) -#define TSDB_MAX_SUBBLOCKS 8 -#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0) -#define helperType(h) (h)->type -#define helperRepo(h) (h)->pRepo -#define helperState(h) (h)->state -#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) -#define helperFileId(h) ((h)->files.fGroup.fileId) -#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD])) -#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA])) -#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST])) -#define helperNewHeadF(h) (&((h)->files.nHeadF)) -#define helperNewLastF(h) (&((h)->files.nLastF)) - -int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); -int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); -void tsdbDestroyHelper(SRWHelper* pHelper); -void tsdbResetHelper(SRWHelper* pHelper); -int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); -int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup); -int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); -int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); -int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); -int tsdbWriteCompInfo(SRWHelper* pHelper); -int tsdbWriteCompIdx(SRWHelper* pHelper); -int tsdbLoadCompIdxImpl(SFile* pFile, uint32_t offset, uint32_t len, void* buffer); -int tsdbDecodeSBlockIdxImpl(void* buffer, uint32_t len, SBlockIdx** ppCompIdx, int* numOfIdx); -int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); -int tsdbLoadCompInfoImpl(SFile* pFile, SBlockIdx* pIdx, SBlockInfo** ppCompInfo); -int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); -int tsdbLoadCompData(SRWHelper* phelper, SBlock* pcompblock, void* target); -void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); -int tsdbLoadBlockDataCols(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo, int16_t* colIds, - int numOfColIds); -int tsdbLoadBlockData(SRWHelper* pHelper, SBlock* pCompBlock, SBlockInfo* pCompInfo); - -static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { - if (*(TSKEY*)key1 > *(TSKEY*)key2) { - return 1; - } else if (*(TSKEY*)key1 == *(TSKEY*)key2) { - return 0; - } else { - return -1; - } -} - -#endif - -// ================= tsdbScan.c -typedef struct { - SFileGroup fGroup; - int numOfIdx; - SBlockIdx* pCompIdx; - SBlockInfo* pCompInfo; - void* pBuf; - FILE* tLogStream; -} STsdbScanHandle; - -int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid); -STsdbScanHandle* tsdbNewScanHandle(); -void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream); -int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid); -int tsdbScanSBlockIdx(STsdbScanHandle* pScanHandle); -int tsdbScanSBlock(STsdbScanHandle* pScanHandle, int idx); -int tsdbCloseScanFile(STsdbScanHandle* pScanHandle); -void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); - -// ------------------ tsdbCommitQueue.c -int tsdbScheduleCommit(STsdbRepo *pRepo); - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 5a9557ff8f2f328c85188094a25d611860488eba..1d49d31eb1bcbd10155fb5929bd241b8999f39ea 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -119,6 +119,11 @@ void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) { return buf; } +int tsdbUpdateDFileHeader(SDFile *pDFile) { + // TODO + return 0; +} + static int tsdbCopyDFile(SDFile *pSrc, int tolevel, int toid, SDFile *pDest) { TSDB_FILE_SET_CLOSED(pDest); diff --git a/src/tsdb/src/tsdbFile_bak.c b/src/tsdb/src/tsdbFile_bak.c deleted file mode 100644 index 411c1d796ed879cac7362c4b60a043ef34ba67da..0000000000000000000000000000000000000000 --- a/src/tsdb/src/tsdbFile_bak.c +++ /dev/null @@ -1,656 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#define _DEFAULT_SOURCE -#define TAOS_RANDOM_FILE_FAIL_TEST -#include -#include "os.h" -#include "talgo.h" -#include "tchecksum.h" -#include "tsdbMain.h" -#include "tutil.h" -#include "tfs.h" -#include "tarray.h" - -const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"}; - -static int compFGroup(const void *arg1, const void *arg2); -static int keyFGroupCompFunc(const void *key, const void *fgroup); -static void *tsdbScanAllFiles(STsdbRepo *pRepo); -static int tsdbCompareFile(const void *arg1, const void *arg2); -static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile); -static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix); - -// STsdbFileH =========================================== -STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { - STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(*pFileH)); - if (pFileH == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - int code = pthread_rwlock_init(&(pFileH->fhlock), NULL); - if (code != 0) { - tsdbError("vgId:%d failed to init file handle lock since %s", pCfg->tsdbId, strerror(code)); - terrno = TAOS_SYSTEM_ERROR(code); - goto _err; - } - - pFileH->maxFGroups = TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile); - - pFileH->pFGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup)); - if (pFileH->pFGroup == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - return pFileH; - -_err: - tsdbFreeFileH(pFileH); - return NULL; -} - -void tsdbFreeFileH(STsdbFileH *pFileH) { - if (pFileH) { - pthread_rwlock_destroy(&pFileH->fhlock); - tfree(pFileH->pFGroup); - free(pFileH); - } -} - -int tsdbOpenFileH(STsdbRepo *pRepo) { - ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL); - - void *pfArray = NULL; - - // Scan the whole directory and get data - pfArray = tsdbScanAllFiles(pRepo); - if (pfArray == NULL) { - return -1; - } - - if (taosArrayGetSize(pfArray) == 0) { - taosArrayDestroy(pfArray); - return 0; - } - - // Sort the files - taosArraySort(pfArray, tsdbCompareFile); - - // Loop to recover the files - int iter = 0; - while (true) { - if (iter >= taosArrayGetSize(pfArray)) break; - - int vid, fid; - char bname[TSDB_FILENAME_LEN] = "\0"; - char suffix[TSDB_FILENAME_LEN] = "\0"; - int count = 0; - - TFILE *pf = taosArrayGet(pfArray, iter); - tfsbasename(pf, bname); - tsdbParseFname(bname, &vid, &fid, suffix); - count++; - iter++; - - while (true) { - int nfid = 0; - if (iter >= taosArrayGetSize(pfArray)) break; - TFILE *npf = taosArrayGet(pfArray, iter); - tfsbasename(npf, bname); - tsdbParseFname(bname, &vid, &nfid, suffix); - - if (nfid != fid) break; - count++; - iter++; - } - - tsdbRestoreFile(pRepo, pf, count); - } - - taosArrayDestroy(pfArray); - return 0; -} - -void tsdbCloseFileH(STsdbRepo *pRepo, bool isRestart) { - STsdbFileH *pFileH = pRepo->tsdbFileH; - - for (int i = 0; i < pFileH->nFGroups; i++) { - SFileGroup *pFGroup = pFileH->pFGroup + i; - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbCloseFile(&(pFGroup->files[type])); - } - if (isRestart) { - tfsDecDiskFile(pFGroup->files[0].file.level, pFGroup->files[0].file.level, TSDB_FILE_TYPE_MAX); - } - } -} - -// SFileGroup =========================================== -SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level) { - STsdbFileH *pFileH = pRepo->tsdbFileH; - SFileGroup fg = {0}; - int id = TFS_UNDECIDED_ID; - char fname[TSDB_FILENAME_LEN] = "\0"; - - ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL); - ASSERT(pFileH->nFGroups < pFileH->maxFGroups); - - // SET FILE GROUP - fg.fileId = fid; - - // CREATE FILES - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = &(fg.files[type]); - - pFile->fd = -1; - pFile->info.size = TSDB_FILE_HEAD_SIZE; - pFile->info.magic = TSDB_FILE_INIT_MAGIC; - - tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, fname); - tfsInitFile(&pFile->file, level, id, fname); - - if (tsdbOpenFile(pFile, O_WRONLY|O_CREAT) < 0) return NULL; - - if (tsdbUpdateFileHeader(pFile) < 0) { - tsdbCloseFile(pFile); - return NULL; - } - - tsdbCloseFile(pFile); - - level = TFILE_LEVEL(&(pFile->file)); - id = TFILE_ID(&(pFile->file)); - } - - // PUT GROUP INTO FILE HANDLE - pthread_rwlock_wrlock(&pFileH->fhlock); - pFileH->pFGroup[pFileH->nFGroups++] = fg; - qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); - pthread_rwlock_unlock(&pFileH->fhlock); - - SFileGroup *pfg = tsdbSearchFGroup(pFileH, fid, TD_EQ); - ASSERT(pfg != NULL); - return pfg; -} - -void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { - ASSERT(pFGroup != NULL); - STsdbFileH *pFileH = pRepo->tsdbFileH; - - SFileGroup fg = *pFGroup; - - int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1); - if (nFilesLeft > 0) { - memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft); - } - - pFileH->nFGroups--; - ASSERT(pFileH->nFGroups >= 0); - - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = &(fg.files[type]); - tfsremove(&(pFile->file)); - } -} - -SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) { - void *ptr = taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), - keyFGroupCompFunc, flags); - if (ptr == NULL) return NULL; - return (SFileGroup *)ptr; -} - -int tsdbGetFidLevel(int fid, SFidGroup fidg) { - if (fid >= fidg.maxFid) { - return 0; - } else if (fid >= fidg.midFid) { - return 1; - } else if (fid >= fidg.minFid) { - return 2; - } else { - return -1; - } -} - -static int compFGroup(const void *arg1, const void *arg2) { - int val1 = ((SFileGroup *)arg1)->fileId; - int val2 = ((SFileGroup *)arg2)->fileId; - - if (val1 < val2) { - return -1; - } else if (val1 > val2) { - return 1; - } else { - return 0; - } -} - -static int keyFGroupCompFunc(const void *key, const void *fgroup) { - int fid = *(int *)key; - SFileGroup *pFGroup = (SFileGroup *)fgroup; - if (fid == pFGroup->fileId) { - return 0; - } else { - return fid > pFGroup->fileId ? 1 : -1; - } -} - -// SFileGroupIter =========================================== -void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { - pIter->pFileH = pFileH; - pIter->direction = direction; - - if (pFileH->nFGroups == 0) { - pIter->index = -1; - pIter->fileId = -1; - } else { - if (direction == TSDB_FGROUP_ITER_FORWARD) { - pIter->index = 0; - } else { - pIter->index = pFileH->nFGroups - 1; - } - pIter->fileId = pFileH->pFGroup[pIter->index].fileId; - } -} - -void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { - STsdbFileH *pFileH = pIter->pFileH; - - if (pFileH->nFGroups == 0) { - pIter->index = -1; - pIter->fileId = -1; - return; - } - - int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; - void *ptr = taosbsearch(&fid, (void *)pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags); - if (ptr == NULL) { - pIter->index = -1; - pIter->fileId = -1; - } else { - pIter->index = (int)(POINTER_DISTANCE(ptr, pFileH->pFGroup) / sizeof(SFileGroup)); - pIter->fileId = ((SFileGroup *)ptr)->fileId; - } -} - -SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { - STsdbFileH *pFileH = pIter->pFileH; - SFileGroup *pFGroup = NULL; - - if (pIter->index < 0 || pIter->index >= pFileH->nFGroups || pIter->fileId < 0) return NULL; - - pFGroup = &pFileH->pFGroup[pIter->index]; - if (pFGroup->fileId != pIter->fileId) { - tsdbSeekFileGroupIter(pIter, pIter->fileId); - } - - if (pIter->index < 0) return NULL; - - pFGroup = &pFileH->pFGroup[pIter->index]; - ASSERT(pFGroup->fileId == pIter->fileId); - - if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) { - pIter->index++; - } else { - pIter->index--; - } - - if (pIter->index >= 0 && pIter->index < pFileH->nFGroups) { - pIter->fileId = pFileH->pFGroup[pIter->index].fileId; - } else { - pIter->fileId = -1; - } - - return pFGroup; -} - -// SFile =========================================== -int tsdbOpenFile(SFile *pFile, int oflag) { - ASSERT(!TSDB_IS_FILE_OPENED(pFile)); - - pFile->fd = tfsopen(&(pFile->file), oflag); - if (pFile->fd < 0) { - tsdbError("failed to open file %s since %s", TSDB_FILE_NAME(pFile), tstrerror(terrno)); - return -1; - } - - tsdbTrace("open file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd); - - return 0; -} - -void tsdbCloseFile(SFile *pFile) { - if (TSDB_IS_FILE_OPENED(pFile)) { - tsdbTrace("close file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd); - tfsclose(pFile->fd); - pFile->fd = -1; - } -} - -int tsdbUpdateFileHeader(SFile *pFile) { - char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - - void *pBuf = (void *)buf; - taosEncodeFixedU32((void *)(&pBuf), TSDB_FILE_VERSION); - tsdbEncodeSFileInfo((void *)(&pBuf), &(pFile->info)); - - taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); - - if (lseek(pFile->fd, 0, SEEK_SET) < 0) { - tsdbError("failed to lseek file %s since %s", TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - if (taosWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, TSDB_FILE_NAME(pFile), - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return 0; -} - -int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) { - int tlen = 0; - tlen += taosEncodeFixedU32(buf, pInfo->magic); - tlen += taosEncodeFixedU32(buf, pInfo->len); - tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks); - tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks); - tlen += taosEncodeFixedU32(buf, pInfo->offset); - tlen += taosEncodeFixedU64(buf, pInfo->size); - tlen += taosEncodeFixedU64(buf, pInfo->tombSize); - - return tlen; -} - -void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { - buf = taosDecodeFixedU32(buf, &(pInfo->magic)); - buf = taosDecodeFixedU32(buf, &(pInfo->len)); - buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); - buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); - buf = taosDecodeFixedU32(buf, &(pInfo->offset)); - buf = taosDecodeFixedU64(buf, &(pInfo->size)); - buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); - - return buf; -} - -int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) { - char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - - if (lseek(pFile->fd, 0, SEEK_SET) < 0) { - tsdbError("failed to lseek file %s to start since %s", TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (taosRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { - tsdbError("failed to read file %s header part with %d bytes, reason:%s", TSDB_FILE_NAME(pFile), TSDB_FILE_HEAD_SIZE, - strerror(errno)); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) { - tsdbError("file %s header part is corrupted with failed checksum", TSDB_FILE_NAME(pFile)); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - void *pBuf = (void *)buf; - pBuf = taosDecodeFixedU32(pBuf, version); - pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); - - return 0; -} - -void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { // TODO - uint32_t version = 0; - SFile file; - SFile * pFile = &file; - - tfsInitFile(&(pFile->file), TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID, fname); - pFile->fd = -1; - - if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; - if (tsdbLoadFileHeader(pFile, &version) < 0) goto _err; - - off_t offset = lseek(pFile->fd, 0, SEEK_END); - if (offset < 0) goto _err; - tsdbCloseFile(pFile); - - *magic = pFile->info.magic; - *size = offset; - - return; - -_err: - tsdbCloseFile(pFile); - *magic = TSDB_FILE_INIT_MAGIC; - *size = 0; -} - -// Retention =========================================== -void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { - STsdbFileH *pFileH = pRepo->tsdbFileH; - SFileGroup *pGroup = pFileH->pFGroup; - - pthread_rwlock_wrlock(&(pFileH->fhlock)); - - while (pFileH->nFGroups > 0 && pGroup[0].fileId < pFidGroup->minFid) { - tsdbRemoveFileGroup(pRepo, pGroup); - } - - pthread_rwlock_unlock(&(pFileH->fhlock)); -} - -void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) { - TSKEY now = taosGetTimestamp(pCfg->precision); - - pFidGroup->minFid = - TSDB_KEY_FILEID(now - pCfg->keep * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); - pFidGroup->midFid = - TSDB_KEY_FILEID(now - pCfg->keep2 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); - pFidGroup->maxFid = - TSDB_KEY_FILEID(now - pCfg->keep1 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision); -} - -int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) { - STsdbFileH *pFileH = pRepo->tsdbFileH; - - for (int i = 0; i < pFileH->nFGroups; i++) { - SFileGroup ofg = pFileH->pFGroup[i]; - - int level = tsdbGetFidLevel(ofg.fileId, *pFidGroup); - ASSERT(level >= 0); - - if (level == ofg.files[0].file.level) continue; - - // COPY THE FILE GROUP TO THE RIGHT LEVEL - SFileGroup nfg = ofg; - int id = TFS_UNDECIDED_ID; - int type = 0; - for (; type < TSDB_FILE_TYPE_MAX; type++) { - tfsInitFile(&nfg.files[type].file, level, id, nfg.files[type].file.rname); - if (tfscopy(&(ofg.files[type].file), &(nfg.files[type].file)) < 0) { - if (terrno == TSDB_CODE_FS_INVLD_LEVEL) break; - tsdbError("vgId:%d failed to move fid %d from level %d to level %d since %s", REPO_ID(pRepo), ofg.fileId, - ofg.files[0].file.level, level, strerror(terrno)); - return -1; - } - - id = nfg.files[type].file.level; - id = nfg.files[type].file.id; - } - - if (type < TSDB_FILE_TYPE_MAX) continue; - - // Register new file into TSDB - pthread_rwlock_wrlock(&(pFileH->fhlock)); - pFileH->pFGroup[i] = nfg; - pthread_rwlock_unlock(&(pFileH->fhlock)); - - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile *pFile = &(ofg.files[type]); - tfsremove(&(pFile->file)); - } - - tsdbDebug("vgId:%d move file group %d from level %d to level %d", REPO_ID(pRepo), ofg.fileId, - ofg.files[0].file.level, level); - } - - return 0; -} - -static void *tsdbScanAllFiles(STsdbRepo *pRepo) { - void * farray = NULL; - TDIR * tdir = NULL; - char dirName[TSDB_FILENAME_LEN] = "\0"; - char bname[TSDB_FILENAME_LEN] = "\0"; - regex_t regex1 = {0}; - const TFILE *pf = NULL; - - farray = taosArrayInit(256, sizeof(TFILE)); - if (farray == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat|l|d|h|s)$", REG_EXTENDED); - - snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", REPO_ID(pRepo)); - - tdir = tfsOpendir(dirName); - - while ((pf = tfsReaddir(tdir)) != NULL) { - tfsbasename(pf, bname); - - int code = regexec(®ex1, bname, 0, NULL, 0); - if (code != 0) { - tsdbWarn("vgId:%d file %s exists, ignore it", REPO_ID(pRepo), pf->aname); - continue; - } - - taosArrayPush(farray, pf); - } - - regfree(®ex1); - tfsClosedir(tdir); - - return farray; -} - -static int tsdbCompareFile(const void *arg1, const void *arg2) { - char bname1[TSDB_FILENAME_LEN] = "\0"; - char bname2[TSDB_FILENAME_LEN] = "\0"; - TFILE *pf1 = (TFILE *)arg1; - TFILE *pf2 = (TFILE *)arg2; - int vid1, fid1, vid2, fid2; - - tfsbasename(pf1, bname1); - tfsbasename(pf2, bname2); - - sscanf(bname1, "v%df%d", &vid1, &fid1); - sscanf(bname2, "v%df%d", &vid2, &fid2); - - ASSERT(vid1 == vid2); - if (fid1 < fid2) { - return -1; - } else if (fid1 == fid2) { - return 0; - } else { - return 1; - } -} - -static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile) { - char backname[TSDB_FILENAME_LEN*2] = "\0"; - char bname[TSDB_FILENAME_LEN] = "\0"; - STsdbFileH *pFileH = pRepo->tsdbFileH; - TFILE * pfArray[TSDB_FILE_TYPE_MAX] = {0}; - TFILE * pHf = NULL; - TFILE * pLf = NULL; - SFileGroup fg = {0}; - int vid = 0; - int fid = 0; - char suffix[TSDB_FILENAME_LEN] = "\0"; - - for (int i = 0; i < nfile; i++) { - TFILE *pf = pfiles + i; - - tfsbasename(pf, bname); - tsdbParseFname(bname, &vid, &fid, suffix); - - if (strcmp(suffix, ".head") == 0) { - pfArray[TSDB_FILE_TYPE_HEAD] = pf; - } else if (strcmp(suffix, ".data") == 0) { - pfArray[TSDB_FILE_TYPE_DATA] = pf; - } else if (strcmp(suffix, ".last") == 0) { - pfArray[TSDB_FILE_TYPE_LAST] = pf; - } else if (strcmp(suffix, ".l") == 0) { - pLf = pf; - } else if (strcmp(suffix, ".h") == 0) { - pHf = pf; - } else { - tsdbWarn("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), pf->aname); - } - } - - if (pfArray[TSDB_FILE_TYPE_HEAD] == NULL || pfArray[TSDB_FILE_TYPE_DATA] == NULL || pfArray[TSDB_FILE_TYPE_LAST] == NULL) { - for (int i = 0; i < nfile; i++) { - snprintf(backname, TSDB_FILENAME_LEN*2, "%s_bak", (pfiles + i)->aname); - rename((pfiles + i)->aname, backname); - } - - return -1; - } - - if (pHf == NULL) { - if (pLf != NULL) { - rename(pLf->aname, pfArray[TSDB_FILE_TYPE_LAST]->aname); - } - } else { - if (pLf != NULL) { - remove(pLf->aname); - } - remove(pHf->aname); - } - - // Register file - fg.fileId = fid; - - for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - SFile * pFile = fg.files + type; - uint32_t version = 0; - - pFile->fd = -1; - pFile->file = *pfArray[type]; // TODO - tsdbOpenFile(pFile, O_RDONLY); - tsdbLoadFileHeader(pFile, &version); - tsdbCloseFile(pFile); - } - - pFileH->pFGroup[pFileH->nFGroups++] = fg; - - tfsIncDiskFile(pfArray[TSDB_FILE_TYPE_HEAD]->level, pfArray[TSDB_FILE_TYPE_HEAD]->id, TSDB_FILE_TYPE_MAX); - - return 0; -} - -static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix) { - sscanf(bname, "v%df%d%s", vid, fid, suffix); -} \ No newline at end of file diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c deleted file mode 100644 index fb2fc36efd5353b24fb0b67c2677caeda940b81e..0000000000000000000000000000000000000000 --- a/src/tsdb/src/tsdbRWHelper.c +++ /dev/null @@ -1,1761 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#define TAOS_RANDOM_FILE_FAIL_TEST -#include "os.h" -#include "talgo.h" -#include "tchecksum.h" -#include "tcoding.h" -#include "tscompression.h" -#include "tsdbMain.h" - -#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM)) -#define TSDB_KEY_COL_OFFSET 0 -#define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SBlock)) -#define TSDB_IS_LAST_BLOCK(pb) ((pb)->last) - -static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); -static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock, - bool isLast, bool isSuperBlock); -static int compareKeyBlock(const void *arg1, const void *arg2); -static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize); -static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx); -static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo); -static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx); -static void tsdbResetHelperFileImpl(SRWHelper *pHelper); -static int tsdbInitHelperFile(SRWHelper *pHelper); -static void tsdbDestroyHelperFile(SRWHelper *pHelper); -static void tsdbResetHelperTableImpl(SRWHelper *pHelper); -static void tsdbResetHelperTable(SRWHelper *pHelper); -static void tsdbInitHelperTable(SRWHelper *pHelper); -static void tsdbDestroyHelperTable(SRWHelper *pHelper); -static void tsdbResetHelperBlockImpl(SRWHelper *pHelper); -static void tsdbResetHelperBlock(SRWHelper *pHelper); -static int tsdbInitHelperBlock(SRWHelper *pHelper); -static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); -static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, - int maxPoints, char *buffer, int bufferSize); -static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, - int numOfColIds); -static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols); -static int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); -static void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); -static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey); -static void tsdbDestroyHelperBlock(SRWHelper *pHelper); -static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol, - SDataCol *pDataCol); -static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock); -static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, - int *blkIdx); -static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows, int8_t update); -static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps); -static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx); - -// ---------------------- INTERNAL FUNCTIONS ---------------------- -int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { - return tsdbInitHelper(pHelper, pRepo, TSDB_READ_HELPER); -} - -int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { - return tsdbInitHelper(pHelper, pRepo, TSDB_WRITE_HELPER); -} - -void tsdbDestroyHelper(SRWHelper *pHelper) { - if (pHelper) { - taosTZfree(pHelper->pBuffer); - taosTZfree(pHelper->compBuffer); - tsdbDestroyHelperFile(pHelper); - tsdbDestroyHelperTable(pHelper); - tsdbDestroyHelperBlock(pHelper); - memset((void *)pHelper, 0, sizeof(*pHelper)); - } -} - -void tsdbResetHelper(SRWHelper *pHelper) { - if (pHelper) { - // Reset the block part - tsdbResetHelperBlockImpl(pHelper); - - // Reset the table part - tsdbResetHelperTableImpl(pHelper); - - // Reset the file part - tsdbCloseHelperFile(pHelper, false, NULL); - tsdbResetHelperFileImpl(pHelper); - - pHelper->state = TSDB_HELPER_CLEAR_STATE; - } -} - -int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { - ASSERT(pHelper != NULL && pGroup != NULL); - SFile * pFile = NULL; - STsdbRepo *pRepo = pHelper->pRepo; - char fname[TSDB_FILENAME_LEN] = "\0"; - int level = pGroup->files[0].file.level; - int id = pGroup->files[0].file.id; - - // Clear the helper object - tsdbResetHelper(pHelper); - - ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE); - - // Set the files - pHelper->files.fGroup = *pGroup; - - // Open the files - if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1; - if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) return -1; - if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) return -1; - - // Create and open .h - pFile = helperNewHeadF(pHelper); - pFile->fd = -1; - pFile->info.size = TSDB_FILE_HEAD_SIZE; - pFile->info.magic = TSDB_FILE_INIT_MAGIC; - tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD, fname); - tfsInitFile(&(pFile->file), level, id, fname); - // TODO: not allow it the increase 1 - if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1; - if (tsdbUpdateFileHeader(pFile) < 0) return -1; - - // Create and open .l file if should - if (tsdbShouldCreateNewLast(pHelper)) { - pFile = helperNewLastF(pHelper); - pFile->fd = -1; - pFile->info.size = TSDB_FILE_HEAD_SIZE; - pFile->info.magic = TSDB_FILE_INIT_MAGIC; - tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD, fname); - tfsInitFile(&(pFile->file), level, id, fname); - // TODO: not allow it the increase 1 - if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1; - if (tsdbUpdateFileHeader(pFile) < 0) return -1; - } - } else { - if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) return -1; - if (tsdbOpenFile(helperLastF(pHelper), O_RDONLY) < 0) return -1; - } - - helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN); - - return 0; -} - -int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError, SFileGroup *pGroup) { - SFile *pFile = NULL; - - pFile = helperHeadF(pHelper); - tsdbCloseFile(pFile); - - pFile = helperDataF(pHelper); - if (pFile->fd > 0) { - if (helperType(pHelper) == TSDB_WRITE_HELPER) { - if (!hasError) { - tsdbUpdateFileHeader(pFile); - } else { - ASSERT(pGroup != NULL); - taosFtruncate(pFile->fd, pGroup->files[TSDB_FILE_TYPE_DATA].info.size); - } - fsync(pFile->fd); - } - tsdbCloseFile(pFile); - } - - pFile = helperLastF(pHelper); - if (pFile->fd > 0) { - if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) { - if (!hasError) { - tsdbUpdateFileHeader(pFile); - } else { - ASSERT(pGroup != NULL); - taosFtruncate(pFile->fd, pGroup->files[TSDB_FILE_TYPE_LAST].info.size); - } - fsync(pFile->fd); - } - tsdbCloseFile(pFile); - } - - if (helperType(pHelper) == TSDB_WRITE_HELPER) { - pFile = helperNewHeadF(pHelper); - if (pFile->fd > 0) { - if (!hasError) { - tsdbUpdateFileHeader(pFile); - fsync(pFile->fd); - } - tsdbCloseFile(pFile); - if (hasError) { - tfsremove(&(pFile->file)); - } - } - - pFile = helperNewLastF(pHelper); - if (pFile->fd > 0) { - if (!hasError) { - tsdbUpdateFileHeader(pFile); - fsync(pFile->fd); - } - tsdbCloseFile(pFile); - if (hasError) { - tfsremove(&(pFile->file)); - } - } - } - return 0; -} - -int tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { - ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD)); - - // Clear members and state used by previous table - tsdbResetHelperTable(pHelper); - ASSERT(helperHasState(pHelper, (TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD))); - - pHelper->tableInfo.tid = pTable->tableId.tid; - pHelper->tableInfo.uid = pTable->tableId.uid; - STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); - - if (tdInitDataCols(pHelper->pDataCols[0], pSchema) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if (tdInitDataCols(pHelper->pDataCols[1], pSchema) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if (pHelper->idxH.numOfIdx > 0) { - while (true) { - if (pHelper->idxH.curIdx >= pHelper->idxH.numOfIdx) { - memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx)); - break; - } - - SBlockIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]); - if (pIdx->tid == TABLE_TID(pTable)) { - if (pIdx->uid == TABLE_UID(pTable)) { - pHelper->curCompIdx = *pIdx; - } else { - memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx)); - } - pHelper->idxH.curIdx++; - break; - } else if (pIdx->tid > TABLE_TID(pTable)) { - memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx)); - break; - } else { - pHelper->idxH.curIdx++; - } - } - } else { - memset(&(pHelper->curCompIdx), 0, sizeof(SBlockIdx)); - } - - if (helperType(pHelper) == TSDB_WRITE_HELPER && pHelper->curCompIdx.hasLast) { - pHelper->hasOldLastBlock = true; - } - - helperSetState(pHelper, TSDB_HELPER_TABLE_SET); - ASSERT(pHelper->state == ((TSDB_HELPER_TABLE_SET << 1) - 1)); - - return 0; -} - -int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { - ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - - SBlockIdx *pIdx = &(pHelper->curCompIdx); - int blkIdx = 0; - - ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable)); - if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; - - while (true) { - ASSERT(blkIdx <= (int)pIdx->numOfBlocks); - TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); - if (keyFirst == TSDB_DATA_TIMESTAMP_NULL || keyFirst > maxKey) break; // iter over - - if (pIdx->len <= 0 || keyFirst > pIdx->maxKey) { - if (tsdbProcessAppendCommit(pHelper, pCommitIter, pDataCols, maxKey) < 0) return -1; - blkIdx = pIdx->numOfBlocks; - } else { - if (tsdbProcessMergeCommit(pHelper, pCommitIter, pDataCols, maxKey, &blkIdx) < 0) return -1; - } - } - - return 0; -} - -int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { - STsdbCfg *pCfg = &pHelper->pRepo->config; - - ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - SBlockIdx * pIdx = &(pHelper->curCompIdx); - SBlock compBlock = {0}; - if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) { - if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; - - SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); - ASSERT(pCompBlock->last); - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && - pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); - if (tsdbWriteBlockToFile(pHelper, helperNewLastF(pHelper), pHelper->pDataCols[0], &compBlock, true, true) < 0) - return -1; - - if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; - -#if 0 - if (pCompBlock->numOfSubBlocks > 1) { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && - pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); - if (tsdbWriteBlockToFile(pHelper, helperNewLastF(pHelper), pHelper->pDataCols[0], &compBlock, true, true) < 0) - return -1; - - if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; - } else { - if (lseek(helperLastF(pHelper)->fd, pCompBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperLastF(pHelper)->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pCompBlock->offset = lseek(helperNewLastF(pHelper)->fd, 0, SEEK_END); - if (pCompBlock->offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperNewLastF(pHelper)->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (taosSendFile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) { - tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo), - helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - } -#endif - - pHelper->hasOldLastBlock = false; - } - - return 0; -} - -int tsdbWriteCompInfo(SRWHelper *pHelper) { - SBlockIdx *pIdx = &(pHelper->curCompIdx); - off_t offset = 0; - SFile * pFile = helperNewHeadF(pHelper); - - if (pIdx->len > 0) { - if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { - if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; - } else { - pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; - pHelper->pCompInfo->uid = pHelper->tableInfo.uid; - pHelper->pCompInfo->tid = pHelper->tableInfo.tid; - ASSERT(pIdx->len > sizeof(SBlockInfo) + sizeof(TSCKSUM) && - (pIdx->len - sizeof(SBlockInfo) - sizeof(TSCKSUM)) % sizeof(SBlock) == 0); - taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); - } - - pFile->info.magic = taosCalcChecksum( - pFile->info.magic, (uint8_t *)POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM)), sizeof(TSCKSUM)); - offset = lseek(pFile->fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pIdx->offset = offset; - pIdx->uid = pHelper->tableInfo.uid; - pIdx->tid = pHelper->tableInfo.tid; - ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); - - if (taosWrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, - TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (taosTSizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SBlockIdx) + 12) { - pHelper->pWIdx = taosTRealloc(pHelper->pWIdx, taosTSizeof(pHelper->pWIdx) == 0 ? 1024 : taosTSizeof(pHelper->pWIdx) * 2); - if (pHelper->pWIdx == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - } - - void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len); - pFile->info.len += tsdbEncodeSBlockIdx(&pBuf, &(pHelper->curCompIdx)); - - pFile->info.size += pIdx->len; - // ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR)); - } - - return 0; -} - -int tsdbWriteCompIdx(SRWHelper *pHelper) { - ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); - off_t offset = 0; - - SFile *pFile = helperNewHeadF(pHelper); - - pFile->info.len += sizeof(TSCKSUM); - if (taosTSizeof(pHelper->pWIdx) < pFile->info.len) { - pHelper->pWIdx = taosTRealloc(pHelper->pWIdx, pFile->info.len); - if (pHelper->pWIdx == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - } - taosCalcChecksumAppend(0, (uint8_t *)pHelper->pWIdx, pFile->info.len); - pFile->info.magic = taosCalcChecksum( - pFile->info.magic, (uint8_t *)POINTER_SHIFT(pHelper->pWIdx, pFile->info.len - sizeof(TSCKSUM)), sizeof(TSCKSUM)); - - offset = lseek(pFile->fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - ASSERT(offset == pFile->info.size); - - if (taosWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, - TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - pFile->info.offset = offset; - pFile->info.size += pFile->info.len; - // ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR)); - - return 0; -} - -int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffer) { - const char *prefixMsg = "failed to load SBlockIdx part"; - if (lseek(pFile->fd, offset, SEEK_SET) < 0) { - tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), offset, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (taosRead(pFile->fd, buffer, len) < len) { - tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), offset, len, - strerror(errno)); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - if (!taosCheckChecksumWhole((uint8_t *)buffer, len)) { - tsdbError("%s: file %s corrupted, offset %u len %u", prefixMsg, TSDB_FILE_NAME(pFile), offset, len); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - return 0; -} - -int tsdbDecodeSBlockIdxImpl(void *buffer, uint32_t len, SBlockIdx **ppCompIdx, int *numOfIdx) { - int nIdx = 0; - void *pPtr = buffer; - - while (POINTER_DISTANCE(pPtr, buffer) < (int)(len - sizeof(TSCKSUM))) { - size_t tlen = taosTSizeof(*ppCompIdx); - if (tlen < sizeof(SBlockIdx) * (nIdx + 1)) { - *ppCompIdx = (SBlockIdx *)taosTRealloc(*ppCompIdx, (tlen == 0) ? 1024 : tlen * 2); - if (*ppCompIdx == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - } - - pPtr = tsdbDecodeSBlockIdx(pPtr, &((*ppCompIdx)[nIdx])); - if (pPtr == NULL) { - tsdbError("failed to decode SBlockIdx part, idx:%d", nIdx); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - nIdx++; - - ASSERT(nIdx == 1 || (*ppCompIdx)[nIdx - 1].tid > (*ppCompIdx)[nIdx - 2].tid); - ASSERT(POINTER_DISTANCE(pPtr, buffer) <= (int)(len - sizeof(TSCKSUM))); - } - - *numOfIdx = nIdx; - return 0; -} - -int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { - ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN); - SFile *pFile = helperHeadF(pHelper); - - if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { - // If not load from file, just load it in object - if (pFile->info.len > 0) { - if ((pHelper->pBuffer = taosTRealloc(pHelper->pBuffer, pFile->info.len)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - // Load SBlockIdx binary from file - if (tsdbLoadCompIdxImpl(pFile, pFile->info.offset, pFile->info.len, (void *)(pHelper->pBuffer)) < 0) { - return -1; - } - - // Decode the SBlockIdx part - if (tsdbDecodeSBlockIdxImpl(pHelper->pBuffer, pFile->info.len, &(pHelper->idxH.pIdxArray), - &(pHelper->idxH.numOfIdx)) < 0) { - tsdbError("vgId:%d failed to decode SBlockIdx part from file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), - tstrerror(errno)); - return -1; - } - } - } - helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); - - // Copy the memory for outside usage - if (target && pHelper->idxH.numOfIdx > 0) - memcpy(target, pHelper->idxH.pIdxArray, sizeof(SBlockIdx) * pHelper->idxH.numOfIdx); - - return 0; -} - -int tsdbLoadCompInfoImpl(SFile *pFile, SBlockIdx *pIdx, SBlockInfo **ppCompInfo) { - const char *prefixMsg = "failed to load SBlockInfo/SBlock part"; - - if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) { - tsdbError("%s: seek to file %s offset %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), pIdx->offset, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - *ppCompInfo = taosTRealloc((void *)(*ppCompInfo), pIdx->len); - if (*ppCompInfo == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if (taosRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) { - tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, TSDB_FILE_NAME(pFile), pIdx->offset, pIdx->len, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (!taosCheckChecksumWhole((uint8_t *)(*ppCompInfo), pIdx->len)) { - tsdbError("%s: file %s corrupted, offset %u len %u", prefixMsg, TSDB_FILE_NAME(pFile), pIdx->offset, pIdx->len); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - return 0; -} - -int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { - ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); - - SBlockIdx *pIdx = &(pHelper->curCompIdx); - - SFile *pFile = helperHeadF(pHelper); - - if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { - if (pIdx->offset > 0) { - ASSERT(pIdx->uid == pHelper->tableInfo.uid); - - if (tsdbLoadCompInfoImpl(pFile, pIdx, &(pHelper->pCompInfo)) < 0) return -1; - - ASSERT(pIdx->uid == pHelper->pCompInfo->uid && pIdx->tid == pHelper->pCompInfo->tid); - } - - helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); - } - - if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len); - - return 0; -} - -int tsdbLoadCompData(SRWHelper *pHelper, SBlock *pCompBlock, void *target) { - ASSERT(pCompBlock->numOfSubBlocks <= 1); - SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); - - if (lseek(pFile->fd, (off_t)pCompBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - size_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols); - pHelper->pCompData = taosTRealloc((void *)pHelper->pCompData, tsize); - if (pHelper->pCompData == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if (taosRead(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) { - tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s since %s", REPO_ID(pHelper->pRepo), tsize, TSDB_FILE_NAME(pFile), - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompData, (uint32_t)tsize)) { - tsdbError("vgId:%d file %s is broken, offset %" PRId64 " size %" PRIzu "", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), - (int64_t)pCompBlock->offset, tsize); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols); - - if (target) memcpy(target, pHelper->pCompData, tsize); - - return 0; -} - -void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) { - SBlockData *pCompData = pHelper->pCompData; - - for (int i = 0, j = 0; i < numOfCols;) { - if (j >= pCompData->numOfCols) { - pStatis[i].numOfNull = -1; - i++; - continue; - } - - if (pStatis[i].colId == pCompData->cols[j].colId) { - pStatis[i].sum = pCompData->cols[j].sum; - pStatis[i].max = pCompData->cols[j].max; - pStatis[i].min = pCompData->cols[j].min; - pStatis[i].maxIndex = pCompData->cols[j].maxIndex; - pStatis[i].minIndex = pCompData->cols[j].minIndex; - pStatis[i].numOfNull = pCompData->cols[j].numOfNull; - i++; - j++; - } else if (pStatis[i].colId < pCompData->cols[j].colId) { - pStatis[i].numOfNull = -1; - i++; - } else { - j++; - } - } -} - -int tsdbLoadBlockDataCols(SRWHelper *pHelper, SBlock *pCompBlock, SBlockInfo *pCompInfo, int16_t *colIds, int numOfColIds) { - ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block - SBlock *pTCompBlock = pCompBlock; - - int numOfSubBlocks = pCompBlock->numOfSubBlocks; - if (numOfSubBlocks > 1) - pTCompBlock = (SBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); - - tdResetDataCols(pHelper->pDataCols[0]); - if (tsdbLoadBlockDataColsImpl(pHelper, pTCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err; - for (int i = 1; i < numOfSubBlocks; i++) { - tdResetDataCols(pHelper->pDataCols[1]); - pTCompBlock++; - if (tsdbLoadBlockDataColsImpl(pHelper, pTCompBlock, pHelper->pDataCols[1], colIds, numOfColIds) < 0) goto _err; - if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err; - } - - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && - dataColsKeyFirst(pHelper->pDataCols[0]) == pCompBlock->keyFirst && - dataColsKeyLast(pHelper->pDataCols[0]) == pCompBlock->keyLast); - - return 0; - -_err: - return -1; -} - -int tsdbLoadBlockData(SRWHelper *pHelper, SBlock *pCompBlock, SBlockInfo *pCompInfo) { - SBlock *pTCompBlock = pCompBlock; - - int numOfSubBlock = pCompBlock->numOfSubBlocks; - if (numOfSubBlock > 1) - pTCompBlock = (SBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); - - tdResetDataCols(pHelper->pDataCols[0]); - if (tsdbLoadBlockDataImpl(pHelper, pTCompBlock, pHelper->pDataCols[0]) < 0) goto _err; - for (int i = 1; i < numOfSubBlock; i++) { - tdResetDataCols(pHelper->pDataCols[1]); - pTCompBlock++; - if (tsdbLoadBlockDataImpl(pHelper, pTCompBlock, pHelper->pDataCols[1]) < 0) goto _err; - if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err; - } - - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows && - dataColsKeyFirst(pHelper->pDataCols[0]) == pCompBlock->keyFirst && - dataColsKeyLast(pHelper->pDataCols[0]) == pCompBlock->keyLast); - - return 0; - -_err: - return -1; -} - -// ---------------------- INTERNAL FUNCTIONS ---------------------- -static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { - ASSERT(helperLastF(pHelper)->fd > 0); - struct stat st; - if (fstat(helperLastF(pHelper)->fd, &st) < 0) return true; - if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true; - return false; -} - -static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock, - bool isLast, bool isSuperBlock) { - STsdbCfg * pCfg = &(pHelper->pRepo->config); - SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer); - int64_t offset = 0; - int rowsToWrite = pDataCols->numOfRows; - - ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); - ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true); - - offset = lseek(pFile->fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - int nColsNotAllNull = 0; - for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column - SDataCol *pDataCol = pDataCols->cols + ncol; - SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull; - - if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it - continue; - } - - memset(pCompCol, 0, sizeof(*pCompCol)); - - pCompCol->colId = pDataCol->colId; - pCompCol->type = pDataCol->type; - if (tDataTypeDesc[pDataCol->type].getStatisFunc) { - (*tDataTypeDesc[pDataCol->type].getStatisFunc)( - (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max), - &(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull)); - } - nColsNotAllNull++; - } - - ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); - - // Compress the data if neccessary - int tcol = 0; - int32_t toffset = 0; - int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull); - int32_t lsize = tsize; - int32_t keyLen = 0; - for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { - if (ncol != 0 && tcol >= nColsNotAllNull) break; - - SDataCol *pDataCol = pDataCols->cols + ncol; - SBlockCol *pCompCol = pCompData->cols + tcol; - - if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue; - void *tptr = POINTER_SHIFT(pCompData, lsize); - - int32_t flen = 0; // final length - int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); - - if (pCfg->compression) { - if (pCfg->compression == TWO_STAGE_COMP) { - pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); - if (pHelper->compBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - } - - flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, - (int32_t)taosTSizeof(pHelper->pBuffer) - lsize, pCfg->compression, - pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)); - } else { - flen = tlen; - memcpy(tptr, pDataCol->pData, flen); - } - - // Add checksum - ASSERT(flen > 0); - flen += sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); - pFile->info.magic = - taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); - - if (ncol != 0) { - pCompCol->offset = toffset; - pCompCol->len = flen; - tcol++; - } else { - keyLen = flen; - } - - toffset += flen; - lsize += flen; - } - - pCompData->delimiter = TSDB_FILE_DELIMITER; - pCompData->uid = pHelper->tableInfo.uid; - pCompData->numOfCols = nColsNotAllNull; - - taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); - pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)), - sizeof(TSCKSUM)); - - // Write the whole block to file - if (taosWrite(pFile->fd, (void *)pCompData, lsize) < lsize) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, TSDB_FILE_NAME(pFile), - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // Update pCompBlock membership vairables - pCompBlock->last = isLast; - pCompBlock->offset = offset; - pCompBlock->algorithm = pCfg->compression; - pCompBlock->numOfRows = rowsToWrite; - pCompBlock->len = lsize; - pCompBlock->keyLen = keyLen; - pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; - pCompBlock->numOfCols = nColsNotAllNull; - pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); - pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); - - tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 - " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, - REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, TSDB_FILE_NAME(pFile), (int64_t)(pCompBlock->offset), - (int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, - pCompBlock->keyLast); - - pFile->info.size += pCompBlock->len; - // ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR)); - - return 0; - -_err: - return -1; -} - -static int compareKeyBlock(const void *arg1, const void *arg2) { - TSKEY key = *(TSKEY *)arg1; - SBlock *pBlock = (SBlock *)arg2; - - if (key < pBlock->keyFirst) { - return -1; - } else if (key > pBlock->keyLast) { - return 1; - } - - return 0; -} - -static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) { - if (taosTSizeof((void *)pHelper->pCompInfo) <= esize) { - size_t tsize = esize + sizeof(SBlock) * 16; - pHelper->pCompInfo = (SBlockInfo *)taosTRealloc(pHelper->pCompInfo, tsize); - if (pHelper->pCompInfo == NULL) return -1; - } - - return 0; -} - -static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) { - SBlockIdx *pIdx = &(pHelper->curCompIdx); - - ASSERT(blkIdx >= 0 && blkIdx <= (int)pIdx->numOfBlocks); - ASSERT(pCompBlock->numOfSubBlocks == 1); - - // Adjust memory if no more room - if (pIdx->len == 0) pIdx->len = sizeof(SBlockInfo) + sizeof(TSCKSUM); - if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SBlockInfo)) < 0) goto _err; - - // Change the offset - for (uint32_t i = 0; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock); - } - - // Memmove if needed - int tsize = pIdx->len - (sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx); - if (tsize > 0) { - ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) < taosTSizeof(pHelper->pCompInfo)); - ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) + tsize <= taosTSizeof(pHelper->pCompInfo)); - memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1)), - POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx), tsize); - } - pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; - - pIdx->numOfBlocks++; - pIdx->len += sizeof(SBlock); - ASSERT(pIdx->len <= taosTSizeof(pHelper->pCompInfo)); - pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; - pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; - - if (pIdx->numOfBlocks > 1) { - ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); - } - - ASSERT((blkIdx == pIdx->numOfBlocks -1) || (!pCompBlock->last)); - - tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, - blkIdx); - - return 0; - -_err: - return -1; -} - -static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, SMergeInfo *pMergeInfo) { - ASSERT(pCompBlock->numOfSubBlocks == 0); - - SBlockIdx *pIdx = &(pHelper->curCompIdx); - ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); - - SBlock *pSBlock = pHelper->pCompInfo->blocks + blkIdx; - ASSERT(pSBlock->numOfSubBlocks >= 1 && pSBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); - - size_t spaceNeeded = - (pSBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SBlock) * 2 : pIdx->len + sizeof(SBlock); - if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err; - - pSBlock = pHelper->pCompInfo->blocks + blkIdx; - - // Add the sub-block - if (pSBlock->numOfSubBlocks > 1) { - size_t tsize = (size_t)(pIdx->len - (pSBlock->offset + pSBlock->len)); - if (tsize > 0) { - memmove((void *)((char *)(pHelper->pCompInfo) + pSBlock->offset + pSBlock->len + sizeof(SBlock)), - (void *)((char *)(pHelper->pCompInfo) + pSBlock->offset + pSBlock->len), tsize); - - for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock); - } - } - - *(SBlock *)((char *)(pHelper->pCompInfo) + pSBlock->offset + pSBlock->len) = *pCompBlock; - - pSBlock->numOfSubBlocks++; - ASSERT(pSBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); - pSBlock->len += sizeof(SBlock); - pSBlock->numOfRows = pSBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; - pSBlock->keyFirst = pMergeInfo->keyFirst; - pSBlock->keyLast = pMergeInfo->keyLast; - pIdx->len += sizeof(SBlock); - } else { // Need to create two sub-blocks - void *ptr = NULL; - for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; - if (pTCompBlock->numOfSubBlocks > 1) { - ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset); - break; - } - } - - if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM)); - - size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); - if (tsize > 0) { - memmove(POINTER_SHIFT(ptr, sizeof(SBlock) * 2), ptr, tsize); - for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SBlock) * 2); - } - } - - ((SBlock *)ptr)[0] = *pSBlock; - ((SBlock *)ptr)[0].numOfSubBlocks = 0; - - ((SBlock *)ptr)[1] = *pCompBlock; - - pSBlock->numOfSubBlocks = 2; - pSBlock->numOfRows = pSBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; - pSBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); - pSBlock->len = sizeof(SBlock) * 2; - pSBlock->keyFirst = pMergeInfo->keyFirst; - pSBlock->keyLast = pMergeInfo->keyLast; - - pIdx->len += (sizeof(SBlock) * 2); - } - - pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast; - pIdx->hasLast = (uint32_t)pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last; - - tsdbDebug("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); - - return 0; - -_err: - return -1; -} - -static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) { - ASSERT(pCompBlock->numOfSubBlocks == 1); - - SBlockIdx *pIdx = &(pHelper->curCompIdx); - - ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); - - SBlock *pSBlock = pHelper->pCompInfo->blocks + blkIdx; - - ASSERT(pSBlock->numOfSubBlocks >= 1); - - // Delete the sub blocks it has - if (pSBlock->numOfSubBlocks > 1) { - size_t tsize = (size_t)(pIdx->len - (pSBlock->offset + pSBlock->len)); - if (tsize > 0) { - memmove(POINTER_SHIFT(pHelper->pCompInfo, pSBlock->offset), - POINTER_SHIFT(pHelper->pCompInfo, pSBlock->offset + pSBlock->len), tsize); - } - - for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SBlock) * pSBlock->numOfSubBlocks); - } - - pIdx->len -= (sizeof(SBlock) * pSBlock->numOfSubBlocks); - } - - *pSBlock = *pCompBlock; - - pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; - pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; - - ASSERT((blkIdx == pIdx->numOfBlocks-1) || (!pCompBlock->last)); - - tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, - blkIdx); - - return 0; -} - -static int tsdbDeleteSuperBlock(SRWHelper *pHelper, int blkIdx) { - SBlockIdx *pCompIdx = &(pHelper->curCompIdx); - - ASSERT(pCompIdx->numOfBlocks > 0 && blkIdx < pCompIdx->numOfBlocks); - - SBlock *pCompBlock= blockAtIdx(pHelper, blkIdx); - SBlock compBlock = *pCompBlock; - ASSERT(pCompBlock->numOfSubBlocks > 0 && pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); - - if (pCompIdx->numOfBlocks == 1) { - memset(pCompIdx, 0, sizeof(*pCompIdx)); - } else { - int tsize = 0; - - if (compBlock.numOfSubBlocks > 1) { - tsize = (int)(pCompIdx->len - (compBlock.offset + sizeof(SBlock) * compBlock.numOfSubBlocks)); - - ASSERT(tsize > 0); - memmove(POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset), - POINTER_SHIFT(pHelper->pCompInfo, compBlock.offset + sizeof(SBlock) * compBlock.numOfSubBlocks), - tsize); - - pCompIdx->len = pCompIdx->len - sizeof(SBlock) * compBlock.numOfSubBlocks; - } - - tsize = (int)(pCompIdx->len - POINTER_DISTANCE(blockAtIdx(pHelper, blkIdx + 1), pHelper->pCompInfo)); - ASSERT(tsize > 0); - memmove((void *)blockAtIdx(pHelper, blkIdx), (void *)blockAtIdx(pHelper, blkIdx + 1), tsize); - - pCompIdx->len -= sizeof(SBlock); - - pCompIdx->numOfBlocks--; - pCompIdx->hasLast = (uint32_t)(blockAtIdx(pHelper, pCompIdx->numOfBlocks - 1)->last); - pCompIdx->maxKey = blockAtIdx(pHelper, pCompIdx->numOfBlocks - 1)->keyLast; - } - - return 0; -} - -static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { - pHelper->idxH.numOfIdx = 0; - pHelper->idxH.curIdx = 0; - memset((void *)&pHelper->files, 0, sizeof(pHelper->files)); - helperHeadF(pHelper)->fd = -1; - helperDataF(pHelper)->fd = -1; - helperLastF(pHelper)->fd = -1; - helperNewHeadF(pHelper)->fd = -1; - helperNewLastF(pHelper)->fd = -1; -} - -static int tsdbInitHelperFile(SRWHelper *pHelper) { - tsdbResetHelperFileImpl(pHelper); - return 0; -} - -static void tsdbDestroyHelperFile(SRWHelper *pHelper) { - tsdbCloseHelperFile(pHelper, false, NULL); - tsdbResetHelperFileImpl(pHelper); - taosTZfree(pHelper->idxH.pIdxArray); - taosTZfree(pHelper->pWIdx); -} - -// ---------- Operations on Helper Table part -static void tsdbResetHelperTableImpl(SRWHelper *pHelper) { - memset((void *)&pHelper->tableInfo, 0, sizeof(SHelperTable)); - pHelper->hasOldLastBlock = false; -} - -static void tsdbResetHelperTable(SRWHelper *pHelper) { - tsdbResetHelperBlock(pHelper); - tsdbResetHelperTableImpl(pHelper); - helperClearState(pHelper, (TSDB_HELPER_TABLE_SET | TSDB_HELPER_INFO_LOAD)); -} - -static void tsdbInitHelperTable(SRWHelper *pHelper) { tsdbResetHelperTableImpl(pHelper); } - -static void tsdbDestroyHelperTable(SRWHelper *pHelper) { taosTZfree((void *)pHelper->pCompInfo); } - -// ---------- Operations on Helper Block part -static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) { - tdResetDataCols(pHelper->pDataCols[0]); - tdResetDataCols(pHelper->pDataCols[1]); -} - -static void tsdbResetHelperBlock(SRWHelper *pHelper) { - tsdbResetHelperBlockImpl(pHelper); - // helperClearState(pHelper, TSDB_HELPER_) -} - -static int tsdbInitHelperBlock(SRWHelper *pHelper) { - STsdbRepo *pRepo = helperRepo(pHelper); - STsdbMeta *pMeta = pHelper->pRepo->tsdbMeta; - - pHelper->pDataCols[0] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); - pHelper->pDataCols[1] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); - if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - tsdbResetHelperBlockImpl(pHelper); - - return 0; -} - -static void tsdbDestroyHelperBlock(SRWHelper *pHelper) { - taosTZfree(pHelper->pCompData); - tdFreeDataCols(pHelper->pDataCols[0]); - tdFreeDataCols(pHelper->pDataCols[1]); -} - -static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) { - STsdbCfg *pCfg = &pRepo->config; - memset((void *)pHelper, 0, sizeof(*pHelper)); - STsdbMeta *pMeta = pRepo->tsdbMeta; - - helperType(pHelper) = type; - helperRepo(pHelper) = pRepo; - helperState(pHelper) = TSDB_HELPER_CLEAR_STATE; - - // Init file part - if (tsdbInitHelperFile(pHelper) < 0) goto _err; - - // Init table part - tsdbInitHelperTable(pHelper); - - // Init block part - if (tsdbInitHelperBlock(pHelper) < 0) goto _err; - - // TODO: pMeta->maxRowBytes and pMeta->maxCols may change here causing invalid write - pHelper->pBuffer = - taosTMalloc(sizeof(SBlockData) + (sizeof(SBlockCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pMeta->maxCols + - pMeta->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM)); - if (pHelper->pBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - return 0; - -_err: - tsdbDestroyHelper(pHelper); - return -1; -} - -static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, - int maxPoints, char *buffer, int bufferSize) { - // Verify by checksum - if (!taosCheckChecksumWhole((uint8_t *)content, len)) { - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - // Decode the data - if (comp) { - // // Need to decompress - int tlen = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, - pDataCol->spaceSize, comp, buffer, bufferSize); - if (tlen <= 0) { - tsdbError("Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d", - len, comp, numOfRows, maxPoints, bufferSize); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - pDataCol->len = tlen; - if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - dataColSetOffset(pDataCol, numOfRows); - } - } else { - // No need to decompress, just memcpy it - pDataCol->len = len - sizeof(TSCKSUM); - memcpy(pDataCol->pData, content, pDataCol->len); - if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - dataColSetOffset(pDataCol, numOfRows); - } - } - return 0; -} - -static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SBlock *pCompBlock, SBlockCol *pCompCol, - SDataCol *pDataCol) { - ASSERT(pDataCol->colId == pCompCol->colId); - int tsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES; - pHelper->pBuffer = taosTRealloc(pHelper->pBuffer, pCompCol->len); - if (pHelper->pBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tsize); - if (pHelper->compBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - int64_t offset = pCompBlock->offset + TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols) + pCompCol->offset; - if (lseek(pFile->fd, (off_t)offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (taosRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) { - tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, TSDB_FILE_NAME(pFile), - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm, - pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock, - pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)) < 0) { - tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), - pCompCol->colId, offset); - return -1; - } - - return 0; -} - -static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) { - ASSERT(pCompBlock->numOfSubBlocks <= 1); - ASSERT(colIds[0] == 0); - - SFile * pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); - SBlockCol compCol = {0}; - - // If only load timestamp column, no need to load SBlockData part - if (numOfColIds > 1 && tsdbLoadCompData(pHelper, pCompBlock, NULL) < 0) goto _err; - - pDataCols->numOfRows = pCompBlock->numOfRows; - - int dcol = 0; - int ccol = 0; - for (int i = 0; i < numOfColIds; i++) { - int16_t colId = colIds[i]; - SDataCol *pDataCol = NULL; - SBlockCol *pCompCol = NULL; - - while (true) { - if (dcol >= pDataCols->numOfCols) { - pDataCol = NULL; - break; - } - pDataCol = &pDataCols->cols[dcol]; - if (pDataCol->colId > colId) { - pDataCol = NULL; - break; - } else { - dcol++; - if (pDataCol->colId == colId) break; - } - } - - if (pDataCol == NULL) continue; - ASSERT(pDataCol->colId == colId); - - if (colId == 0) { // load the key row - compCol.colId = colId; - compCol.len = pCompBlock->keyLen; - compCol.type = pDataCol->type; - compCol.offset = TSDB_KEY_COL_OFFSET; - pCompCol = &compCol; - } else { // load non-key rows - while (true) { - if (ccol >= pCompBlock->numOfCols) { - pCompCol = NULL; - break; - } - - pCompCol = &(pHelper->pCompData->cols[ccol]); - if (pCompCol->colId > colId) { - pCompCol = NULL; - break; - } else { - ccol++; - if (pCompCol->colId == colId) break; - } - } - - if (pCompCol == NULL) { - dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); - continue; - } - - ASSERT(pCompCol->colId == pDataCol->colId); - } - - if (tsdbLoadColData(pHelper, pFile, pCompBlock, pCompCol, pDataCol) < 0) goto _err; - } - - return 0; - -_err: - return -1; -} - -static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SBlock *pCompBlock, SDataCols *pDataCols) { - ASSERT(pCompBlock->numOfSubBlocks <= 1); - - SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper); - - pHelper->pBuffer = taosTRealloc(pHelper->pBuffer, pCompBlock->len); - if (pHelper->pBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - SBlockData *pCompData = (SBlockData *)pHelper->pBuffer; - - int fd = pFile->fd; - if (lseek(fd, (off_t)pCompBlock->offset, SEEK_SET) < 0) { - tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, - TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - if (taosRead(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) { - tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len, - TSDB_FILE_NAME(pFile), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - int32_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols); - if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) { - tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo), - TSDB_FILE_NAME(pFile), (int64_t)(pCompBlock->offset), pCompBlock->len); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - goto _err; - } - ASSERT(pCompData->numOfCols == pCompBlock->numOfCols); - - pDataCols->numOfRows = pCompBlock->numOfRows; - - // Recover the data - int ccol = 0; // loop iter for SBlockCol object - int dcol = 0; // loop iter for SDataCols object - while (dcol < pDataCols->numOfCols) { - SDataCol *pDataCol = &(pDataCols->cols[dcol]); - if (dcol != 0 && ccol >= pCompData->numOfCols) { - // Set current column as NULL and forward - dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); - dcol++; - continue; - } - - int16_t tcolId = 0; - int32_t toffset = TSDB_KEY_COL_OFFSET; - int32_t tlen = pCompBlock->keyLen; - - if (dcol != 0) { - SBlockCol *pCompCol = &(pCompData->cols[ccol]); - tcolId = pCompCol->colId; - toffset = pCompCol->offset; - tlen = pCompCol->len; - } else { - ASSERT(pDataCol->colId == tcolId); - } - - if (tcolId == pDataCol->colId) { - if (pCompBlock->algorithm == TWO_STAGE_COMP) { - int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES; - if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { - zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows); - } - pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, zsize); - if (pHelper->compBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - } - if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + toffset, tlen, pCompBlock->algorithm, - pCompBlock->numOfRows, pDataCols->maxPoints, pHelper->compBuffer, - (int32_t)taosTSizeof(pHelper->compBuffer)) < 0) { - tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d", - REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), tcolId, (int64_t)pCompBlock->offset, toffset); - goto _err; - } - if (dcol != 0) ccol++; - dcol++; - } else if (tcolId < pDataCol->colId) { - ccol++; - } else { - // Set current column as NULL and forward - dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints); - dcol++; - } - } - - return 0; - -_err: - return -1; -} - -static int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) { - int tlen = 0; - - tlen += taosEncodeVariantI32(buf, pIdx->tid); - tlen += taosEncodeVariantU32(buf, pIdx->len); - tlen += taosEncodeVariantU32(buf, pIdx->offset); - tlen += taosEncodeFixedU8(buf, pIdx->hasLast); - tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks); - tlen += taosEncodeFixedU64(buf, pIdx->uid); - tlen += taosEncodeFixedU64(buf, pIdx->maxKey); - - return tlen; -} - -static void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) { - uint8_t hasLast = 0; - uint32_t numOfBlocks = 0; - uint64_t value = 0; - - if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL; - if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL; - if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL; - if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL; - pIdx->hasLast = hasLast; - if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL; - pIdx->numOfBlocks = numOfBlocks; - if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; - pIdx->uid = (int64_t)value; - if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; - pIdx->maxKey = (TSKEY)value; - - return buf; -} - -static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { - STsdbCfg * pCfg = &(pHelper->pRepo->config); - STable * pTable = pCommitIter->pTable; - SBlockIdx * pIdx = &(pHelper->curCompIdx); - TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); - int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; - SBlock compBlock = {0}; - SMergeInfo mergeInfo = {0}; - SMergeInfo *pMergeInfo = &mergeInfo; - - ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); - if (pIdx->hasLast) { // append to with last block - ASSERT(pIdx->len > 0); - SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); - ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, pDataCols, - NULL, 0, pCfg->update, pMergeInfo); - - ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows); - - if (pDataCols->numOfRows > 0) { - ASSERT((pMergeInfo->keyFirst == dataColsKeyFirst(pDataCols)) && (pMergeInfo->keyLast == dataColsKeyLast(pDataCols))); - - if (pDataCols->numOfRows + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && - pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { - if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; - pMergeInfo->keyFirst = MIN(pMergeInfo->keyFirst, pCompBlock->keyFirst); - pMergeInfo->keyLast = MAX(pMergeInfo->keyLast, pCompBlock->keyLast); - if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, pMergeInfo) < 0) return -1; - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); - - if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); - - if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; - if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; - } - - if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; - } - } else { - ASSERT(!pHelper->hasOldLastBlock); - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0, pCfg->update, pMergeInfo); - ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows); - - if (pDataCols->numOfRows > 0) { - ASSERT((pMergeInfo->keyFirst == dataColsKeyFirst(pDataCols)) && (pMergeInfo->keyLast == dataColsKeyLast(pDataCols))); - if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; - if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; - } - } - -#ifndef NDEBUG - TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter); - ASSERT(keyNext == TSDB_DATA_TIMESTAMP_NULL || keyNext > pIdx->maxKey); -#endif - - return 0; -} - -static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, - int *blkIdx) { - STsdbCfg * pCfg = &(pHelper->pRepo->config); - STable * pTable = pCommitIter->pTable; - SBlockIdx * pIdx = &(pHelper->curCompIdx); - SBlock compBlock = {0}; - TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); - int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; - SDataCols * pDataCols0 = pHelper->pDataCols[0]; - SMergeInfo mergeInfo = {0}; - SMergeInfo *pMergeInfo = &mergeInfo; - SBlock oBlock = {0}; - - SSkipListIterator slIter = {0}; - - ASSERT(keyFirst <= pIdx->maxKey); - - SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), - pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE); - ASSERT(pCompBlock != NULL); - int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); - oBlock = *pCompBlock; - - ASSERT((!TSDB_IS_LAST_BLOCK(&oBlock)) || (tblkIdx == pIdx->numOfBlocks - 1)); - - if ((!TSDB_IS_LAST_BLOCK(&oBlock)) && keyFirst < pCompBlock->keyFirst) { - while (true) { - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, oBlock.keyFirst-1, defaultRowsInBlock, pDataCols, NULL, 0, - pCfg->update, pMergeInfo); - ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows); - if (pDataCols->numOfRows == 0) break; - - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - tblkIdx++; - } - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) == TSDB_DATA_TIMESTAMP_NULL || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } else { - int16_t colId = 0; - if (tsdbLoadBlockDataCols(pHelper, &oBlock, NULL, &colId, 1) < 0) return -1; - - TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (blockAtIdx(pHelper, tblkIdx + 1)->keyFirst - 1); - - slIter = *(pCommitIter->pIter); - tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows, - pCfg->update, pMergeInfo); - - if (pMergeInfo->nOperations == 0) { - // Do nothing - ASSERT(pMergeInfo->rowsDeleteFailed >= 0); - *(pCommitIter->pIter) = slIter; - tblkIdx++; - } else if (oBlock.numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) { - // Delete the block and do some stuff - // ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN); - if (tsdbDeleteSuperBlock(pHelper, tblkIdx) < 0) return -1; - *pCommitIter->pIter = slIter; - if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; - } else if (tsdbCheckAddSubBlockCond(pHelper, &oBlock, pMergeInfo, pDataCols->maxPoints)) { - // Append as a sub-block of the searched block - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, INT_MAX, pDataCols, pDataCols0->cols[0].pData, - pDataCols0->numOfRows, pCfg->update, pMergeInfo); - ASSERT(memcmp(pCommitIter->pIter, &slIter, sizeof(slIter)) == 0); - if (tsdbWriteBlockToFile(pHelper, oBlock.last ? helperLastF(pHelper) : helperDataF(pHelper), pDataCols, - &compBlock, oBlock.last, false) < 0) { - return -1; - } - if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, pMergeInfo) < 0) { - return -1; - } - tblkIdx++; - } else { - // load the block data, merge with the memory data - if (tsdbLoadBlockData(pHelper, &oBlock, NULL) < 0) return -1; - int round = 0; - int dIter = 0; - while (true) { - tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock, - pCfg->update); - - if (pDataCols->numOfRows == 0) break; - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - - if (round == 0) { - if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; - if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } else { - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } - - round++; - tblkIdx++; - } - } - } - - *blkIdx = tblkIdx; - return 0; -} - -static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows, int8_t update) { - TSKEY key1 = INT64_MAX; - TSKEY key2 = INT64_MAX; - STSchema *pSchema = NULL; - - ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); - tdResetDataCols(pTarget); - - while (true) { - key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); - bool isRowDel = false; - SDataRow row = tsdbNextIterRow(pCommitIter->pIter); - if (row == NULL || dataRowKey(row) > maxKey) { - key2 = INT64_MAX; - } else { - key2 = dataRowKey(row); - isRowDel = dataRowDeleted(row); - } - - if (key1 == INT64_MAX && key2 == INT64_MAX) break; - - if (key1 < key2) { - for (int i = 0; i < pDataCols->numOfCols; i++) { - dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints); - } - - pTarget->numOfRows++; - (*iter)++; - } else if (key1 > key2) { - if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendDataRowToDataCol(row, pSchema, pTarget); - } - - tSkipListIterNext(pCommitIter->pIter); - } else { - if (update) { - if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendDataRowToDataCol(row, pSchema, pTarget); - } - } else { - ASSERT(!isRowDel); - - for (int i = 0; i < pDataCols->numOfCols; i++) { - dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints); - } - - pTarget->numOfRows++; - } - (*iter)++; - tSkipListIterNext(pCommitIter->pIter); - } - - if (pTarget->numOfRows >= maxRows) break; - } -} - -static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock) { - STsdbCfg *pCfg = &(pHelper->pRepo->config); - SFile * pFile = NULL; - bool isLast = false; - - ASSERT(pDataCols->numOfRows > 0); - - if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { - pFile = helperDataF(pHelper); - } else { - isLast = true; - pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? helperNewLastF(pHelper) : helperLastF(pHelper); - } - - ASSERT(pFile->fd > 0); - - if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1; - - return 0; -} - -static bool tsdbCheckAddSubBlockCond(SRWHelper *pHelper, SBlock *pCompBlock, SMergeInfo *pMergeInfo, int maxOps) { - STsdbCfg *pCfg = &(pHelper->pRepo->config); - int mergeRows = pCompBlock->numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed; - - ASSERT(mergeRows > 0); - - if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pMergeInfo->nOperations <= maxOps) { - if (pCompBlock->last) { - if (!TSDB_NLAST_FILE_OPENED(pHelper) && mergeRows < pCfg->minRowsPerFileBlock) return true; - } else { - if (mergeRows < pCfg->maxRowsPerFileBlock) return true; - } - } - - return false; -} \ No newline at end of file diff --git a/src/tsdb/src/tsdbStore.c b/src/tsdb/src/tsdbStore.c index 14208886480abb81027c1149cc2aaa7b65ab8a06..2fb1e06221a3407e128935f25ea300450ef5df2c 100644 --- a/src/tsdb/src/tsdbStore.c +++ b/src/tsdb/src/tsdbStore.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #define TAOS_RANDOM_FILE_FAIL_TEST -#include "tsdbMain.h" +#include "tsdbint.h" typedef struct { uint64_t uid;