提交 18807c14 编写于 作者: H Hongze Cheng

correct a lot of compile issue

上级 5b8c0159
......@@ -78,6 +78,10 @@ int tsdbEndTxnWithError(STsdbFS *pfs);
void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
void tsdbFSIterSeek(SFSIter *pIter, int fid);
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock));
if (code != 0) {
......
......@@ -59,11 +59,12 @@ typedef struct {
int fd;
} SMFile;
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, int ver);
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
void tsdbInitMFileEx(SMFile* pMFile, SMFile* pOMFile);
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
int tsdbApplyMFileChange(const SMFile* from, const SMFile* to);
int tsdbApplyMFileChange(SMFile* from, SMFile* to);
int tsdbApplyMFileChange(SMFile* from, SMFile* to);
static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pMFile));
......@@ -287,11 +288,11 @@ typedef struct {
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, int ver);
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(const SDFileSet* from, const SDFileSet* to);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_KV_H_
#define _TD_TSDB_KV_H_
#ifdef __cplusplus
extern "C" {
#endif
#define KVSTORE_FILE_VERSION ((uint32_t)0)
typedef int (*iterFunc)(void*, void* cont, int contLen);
typedef void (*afterFunc)(void*);
typedef struct {
SMFile f;
SHashObj* map;
iterFunc iFunc;
afterFunc aFunc;
void* appH;
} SKVStore;
#define KVSTORE_MAGIC(s) (s)->f.info.magic
int tdCreateKVStore(char* fname);
int tdDestroyKVStore(char* fname);
SKVStore* tdOpenKVStore(char* fname, iterFunc iFunc, afterFunc aFunc, void* appH);
void tdCloseKVStore(SKVStore* pStore);
int tdKVStoreStartCommit(SKVStore* pStore);
int tdUpdateKVStoreRecord(SKVStore* pStore, uint64_t uid, void* cont, int contLen);
int tdDropKVStoreRecord(SKVStore* pStore, uint64_t uid);
int tdKVStoreEndCommit(SKVStore* pStore);
void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size);
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_KV_H_*/
\ No newline at end of file
......@@ -51,7 +51,6 @@ typedef struct {
STable** tables;
SList* superList;
SHashObj* uidMap;
SKVStore* pStore;
int maxRowBytes;
int maxCols;
} STsdbMeta;
......@@ -63,7 +62,7 @@ typedef struct {
#define TABLE_UID(t) (t)->tableId.uid
#define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
// #define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
......
......@@ -109,9 +109,8 @@ void tsdbCloseAndUnsetFSet(SReadH *pReadh);
int tsdbLoadBlockIdx(SReadH *pReadh);
int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo);
int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo, const int16_t *colIds,
int numOfColsIds);
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds);
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
......
......@@ -23,6 +23,8 @@
#include <inttypes.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <semaphore.h>
#include <dirent.h>
#include "os.h"
#include "tlog.h"
......@@ -98,7 +100,7 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
char* tsdbGetDataDirName(char* rootDir);
int tsdbGetNextMaxTables(int tid);
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
// STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo);
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
......
......@@ -54,6 +54,40 @@ typedef struct {
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5)
static int tsdbCommitMeta(STsdbRepo *pRepo);
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen);
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
static int tsdbCommitTSData(STsdbRepo *pRepo);
static int tsdbStartCommit(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith);
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo);
static void tsdbDestroyCommitH(SCommitH *pCommith);
static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
static int tsdbGetFidLevel(int fid, SRtn *pRtn);
static int tsdbNextCommitFid(SCommitH *pCommith);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int tsdbWriteBlockInfo(SCommitH *pCommih);
static int tsdbWriteBlockIdx(SCommitH *pCommih);
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock);
static void tsdbResetCommitFile(SCommitH *pCommith);
static void tsdbResetCommitTable(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update);
void *tsdbCommitData(STsdbRepo *pRepo) {
if (tsdbStartCommit(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno));
......@@ -92,6 +126,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
SActObj * pAct = NULL;
SActCont * pCont = NULL;
SListNode *pNode = NULL;
SDiskID did;
ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0);
......@@ -103,13 +138,15 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
// Create/Open a meta file or open the existing file
if (pOMFile == NULL) {
// Create a new meta file
tsdbInitMFile(&mf, {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID}, REPO_ID(pRepo), pfs->nstatus->meta.version);
did.level = TFS_PRIMARY_LEVEL;
did.id = TFS_PRIMARY_ID;
tsdbInitMFile(&mf, did, REPO_ID(pRepo), pfs->nstatus->meta.version);
if (tsdbCreateMFile(&mf) < 0) {
return -1;
}
} else {
tsdbInitMFile(&mf, pOMFile);
tsdbInitMFileEx(&mf, pOMFile);
if (tsdbOpenMFile(&mf, O_WRONLY) < 0) {
return -1;
}
......@@ -178,12 +215,12 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void
rInfo.uid = uid;
rInfo.size = contLen;
tlen = tsdbEncodeKVRecord((void **)(&pBuf), pRInfo);
if (tsdbAppendMFile(pMFile, buf, tlen) < tlen) {
int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo);
if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) {
return -1;
}
if (tsdbAppendMFile(pMFile, cont, contLen) < contLen) {
if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) {
return -1;
}
......@@ -214,16 +251,16 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
rInfo.size = pRecord->size;
void *pBuf = buf;
tdEncodeKVRecord(&pBuf, &rInfo);
tsdbEncodeKVRecord(&pBuf, &rInfo);
if (tsdbAppendMFile(pMFile, buf, POINTER_DISTANCE(pBuf, buf), NULL) < 0) {
return -1;
}
pMFile->meta.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf));
pMFile->meta.nDels++;
pMFile->meta.nRecords--;
pMFile->meta.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf));
pMFile->info.nDels++;
pMFile->info.nRecords--;
pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid));
return 0;
......@@ -233,8 +270,8 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
// =================== Commit Time-Series Data
static int tsdbCommitTSData(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbCfg * pCfg = REPO_CFG(pRepo);
SCommitH commith = {0};
STsdbFS * pfs = REPO_FS(pRepo);
SDFileSet *pSet = NULL;
SDFileSet nSet;
int fid;
......@@ -273,17 +310,17 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
if (level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
if (tsdbCopyDFileSet(*pSet, level, id, &nSet) < 0) {
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
if (tsdbUpdateDFileSet(pRepo, &nSet) < 0) {
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
} else {
if (tsdbUpdateDFileSet(pRepo, pSet) < 0) {
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
......@@ -307,7 +344,7 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
}
fid = tsdbNextCommitFid(&commith);
if (tsdbCommitToFile(pCSet, &commith, cfid) < 0) {
if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
......@@ -324,15 +361,17 @@ static int tsdbStartCommit(STsdbRepo *pRepo) {
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64 " meta rows: %d",
REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows, listNEles(pMem->actList));
if (tsdbFSNewTxn(pRepo) < 0) return -1;
if (tsdbStartTxn(REPO_FS(pRepo)) < 0) return -1;
pRepo->code = TSDB_CODE_SUCCESS;
return 0;
}
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
if (tsdbFSEndTxn(pRepo, eno != TSDB_CODE_SUCCESS) < 0) {
eno = terrno;
if (eno != TSDB_CODE_SUCCESS) {
tsdbEndTxnWithError(REPO_FS(pRepo));
} else {
tsdbEndTxn(REPO_FS(pRepo));
}
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
......@@ -348,6 +387,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
sem_post(&(pRepo->readyToCommit));
}
#if 0
static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) {
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
......@@ -355,6 +395,7 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
}
return false;
}
#endif
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
......@@ -383,9 +424,14 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
}
}
if (tsdbWriteBlockIdx(pCommith) < 0) {
tsdbCloseCommitFile(pCommith, true);
return -1;
}
tsdbCloseCommitFile(pCommith, false);
if (tsdbUpdateDFileSet(pRepo, &(pCommith->wSet)) < 0) {
if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
// TODO
return -1;
}
......@@ -393,7 +439,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
return 0;
}
static SCommitIter *tsdbCreateCommitIters(SCommitH *pCommith) {
static int tsdbCreateCommitIters(SCommitH *pCommith) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
......@@ -405,7 +451,7 @@ static SCommitIter *tsdbCreateCommitIters(SCommitH *pCommith) {
return -1;
}
if (tsdbRLockRepoMeta(pRepo) < 0) return -1
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
// reference all tables
for (int i = 0; i < pMem->maxTables; i++) {
......@@ -438,7 +484,7 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
for (int i = 1; i < pCommith->niters; i++) {
if (pCommith->iters[i].pTable != NULL) {
tsdbUnRefTable(pCommith->iters[i].pTable);
tSkipListDestroyIter(iters[i].pIter);
tSkipListDestroyIter(pCommith->iters[i].pIter);
}
}
......@@ -480,10 +526,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) {
}
// Init file iterator
if (tsdbInitFSIter(pRepo, &(pCommith->fsIter)) < 0) {
tsdbDestroyCommitH(pCommith);
return -1;
}
tsdbFSIterInit(&(pCommith->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
if (tsdbCreateCommitIters(pCommith) < 0) {
tsdbDestroyCommitH(pCommith);
......@@ -839,9 +882,9 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type;
if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull));
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
}
nColsNotAllNull++;
}
......@@ -915,7 +958,7 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)));
// Write the whole block to file
if (tsdbWriteDFile(pDFile, (void *)pBlockData, lsize < lsize)) {
if (tsdbWriteDFile(pDFile, (void *)pBlockData, lsize) < lsize) {
return -1;
}
......@@ -1164,7 +1207,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
SDFile *pCommitF = (pBlock->last) ? TSDB_COMMIT_LAST_FILE(pCommith) : TSDB_COMMIT_DATA_FILE(pCommith);
SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh));
// SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh));
SBlock block;
if ((pBlock->last && pCommith->isLFileSame) || ((!pBlock->last) && pCommith->isDFileSame)) {
......@@ -1172,7 +1215,7 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) return -1;
} else {
block = *pBlock;
block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlock);
block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlk);
if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
pBlock->numOfSubBlocks) < 0) {
......@@ -1235,8 +1278,8 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
}
return 0;
}
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
......@@ -1323,6 +1366,7 @@ static void tsdbResetCommitTable(SCommitH *pCommith) {
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
int level, id;
SDiskID did;
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id);
......@@ -1350,11 +1394,13 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// Set and open commit FSET
if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) {
// Create new FSET
tsdbInitDFileSet(pWSet, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, level, id);
did.level = level;
did.id = id;
tsdbInitDFileSet(pWSet, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version);
if (tsdbOpenDFileSet(pWSet, O_WRONLY | O_CREAT) < 0) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
remove(TSDB_FILE_FULL_NAME(pWSet, ftype));
remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pWSet, ftype)));
}
if (pCommith->isRFileSet) {
......@@ -1367,7 +1413,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
tsdbCloseDFileSet(pWSet);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
remove(TSDB_FILE_FULL_NAME(pWSet, ftype));
remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pWSet, ftype)));
}
if (pCommith->isRFileSet) {
......@@ -1384,8 +1430,10 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pWHeadf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_HEAD);
if (tsdbCreateAndOpenDFile(pWHeadf) < 0) {
did.level = level;
did.id = id;
tsdbInitDFile(pWHeadf, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, TSDB_FILE_HEAD);
if (tsdbCreateDFile(pWHeadf) < 0) {
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
......@@ -1395,7 +1443,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// TSDB_FILE_DATA
SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
tsdbInitDFileWithOld(pWHeadf, pRDataf);
tsdbInitDFileEx(pWHeadf, pRDataf);
if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
tsdbCloseDFile(pWHeadf);
remove(TSDB_FILE_FULL_NAME(pWHeadf));
......@@ -1410,10 +1458,10 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh));
SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith);
if (pRLastf->info.size < 32 * 1024) {
tsdbInitDFileWithOld(pWLastf, pRLastf);
tsdbInitDFileEx(pWLastf, pRLastf);
pCommith->isLFileSame = true;
} else {
tsdbInitDFile(pWLastf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_LAST);
tsdbInitDFile(pWLastf, did, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, TSDB_FILE_LAST);
pCommith->isLFileSame = false;
}
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
......
......@@ -13,11 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tglobal.h"
#include "tlist.h"
#include "tref.h"
#include "tsdbMain.h"
#include "tsdbint.h"
typedef struct {
bool stop;
......
......@@ -24,6 +24,14 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"manifest" // TSDB_FILE_MANIFEST
};
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo);
static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo);
static int tsdbRollBackMFile(SMFile *pMFile);
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo);
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo);
static int tsdbRollBackDFile(SDFile *pDFile);
// ============== SMFile
void tsdbInitMFile(SMFile *pMFile, SDiskID did, int vid, uint32_t ver) {
char fname[TSDB_FILENAME_LEN];
......@@ -67,7 +75,7 @@ int tsdbApplyMFileChange(SMFile *from, SMFile *to) {
} else {
if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) {
if (from->info.size > to->info.size) {
tsdbRollbackMFile(to);
tsdbRollBackMFile(to);
}
} else {
tsdbRemoveMFile(from);
......@@ -78,7 +86,7 @@ int tsdbApplyMFileChange(SMFile *from, SMFile *to) {
return 0;
}
static int tsdbRollBackMFile(const SMFile *pMFile) {
static int tsdbRollBackMFile(SMFile *pMFile) {
SMFile mf = *pMFile;
if (tsdbOpenMFile(&mf, O_WRONLY) < 0) {
......@@ -172,7 +180,7 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver,
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
tsdbGetFilename(vid, 0, ver, ftype, fname);
tfsInitFile(&(pDFile->f), level, id, fname);
tfsInitFile(&(pDFile->f), did.level, did.id, fname);
}
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile) {
......@@ -271,7 +279,7 @@ static int tsdbApplyDFileChange(SDFile *from, SDFile *to) {
} else {
if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) {
if (from->info.size > to->info.size) {
tsdbRollbackDFile(to);
tsdbRollBackDFile(to);
}
} else {
tsdbRemoveDFile(from);
......@@ -282,7 +290,7 @@ static int tsdbApplyDFileChange(SDFile *from, SDFile *to) {
return 0;
}
static int tsdbRollBackDFile(const SDFile *pDFile) {
static int tsdbRollBackDFile(SDFile *pDFile) {
SDFile df = *pDFile;
if (tsdbOpenDFile(&df, O_WRONLY) < 0) {
......@@ -328,7 +336,7 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
return tlen
return tlen;
}
void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) {
......@@ -338,7 +346,7 @@ void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) {
return buf;
}
int tsdbApplyDFileSetChange(const SDFileSet *from, const SDFileSet *to) {
int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbApplyDFileChange(TSDB_DFILE_IN_SET(from, ftype), TSDB_DFILE_IN_SET(to, ftype)) < 0) {
return -1;
......@@ -366,7 +374,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
return -1;
}
}
return 0
return 0;
}
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) {
......@@ -374,15 +382,15 @@ static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, c
if (ftype < TSDB_FILE_MAX) {
if (ver == 0) {
snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]);
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]);
} else {
snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRIu32, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver);
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRIu32, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver);
}
} else {
if (ver == 0) {
snprintf(fname, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]);
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]);
} else {
snprintf(fname, "vnode/vnode%d/tsdb/%s-%012" PRIu32, vid, TSDB_FNAME_SUFFIX[ftype], ver);
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s-%012" PRIu32, vid, TSDB_FNAME_SUFFIX[ftype], ver);
}
}
}
\ No newline at end of file
......@@ -14,14 +14,7 @@
*/
// no test file errors here
#include "tsdbMain.h"
#include "os.h"
#include "talgo.h"
#include "taosdef.h"
#include "tchecksum.h"
#include "tscompression.h"
#include "tsdb.h"
#include "tulog.h"
#include "tsdbint.h"
#define TSDB_CFG_FILE_NAME "config"
#define TSDB_DATA_DIR_NAME "data"
......@@ -388,7 +381,7 @@ int tsdbCheckCommit(STsdbRepo *pRepo) {
}
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
// STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
// ----------------- LOCAL FUNCTIONS -----------------
......
......@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
#include "tsdbMain.h"
#include "tsdbint.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
......
......@@ -12,13 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include "hash.h"
#include "taosdef.h"
#include "tchecksum.h"
#include "tsdb.h"
#include "tsdbMain.h"
#include "tskiplist.h"
#include "tsdbint.h"
#define TSDB_SUPER_TABLE_SL_LEVEL 5
#define DEFAULT_TAG_INDEX_COLUMN 0
......@@ -479,11 +473,11 @@ int tsdbOpenMeta(STsdbRepo *pRepo) {
goto _err;
}
pMeta->pStore = tdOpenKVStore(fname, tsdbRestoreTable, tsdbOrgMeta, (void *)pRepo);
if (pMeta->pStore == NULL) {
tsdbError("vgId:%d failed to open TSDB meta while open the kv store since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// pMeta->pStore = tdOpenKVStore(fname, tsdbRestoreTable, tsdbOrgMeta, (void *)pRepo);
// if (pMeta->pStore == NULL) {
// tsdbError("vgId:%d failed to open TSDB meta while open the kv store since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
tsdbDebug("vgId:%d open TSDB meta succeed", REPO_ID(pRepo));
tfree(fname);
......@@ -500,7 +494,7 @@ int tsdbCloseMeta(STsdbRepo *pRepo) {
STable * pTable = NULL;
if (pMeta == NULL) return 0;
tdCloseKVStore(pMeta->pStore);
// tdCloseKVStore(pMeta->pStore);
for (int i = 1; i < pMeta->maxTables; i++) {
tsdbFreeTable(pMeta->tables[i]);
}
......@@ -610,7 +604,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
}
// ------------------ LOCAL FUNCTIONS ------------------
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
static UNUSED_FUNC int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
STable * pTable = NULL;
......@@ -631,7 +625,7 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
return 0;
}
static void tsdbOrgMeta(void *pHandle) {
static UNUSED_FUNC void tsdbOrgMeta(void *pHandle) {
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
STsdbMeta *pMeta = pRepo->tsdbMeta;
......
......@@ -22,7 +22,7 @@
#include "../../query/inc/qAst.h" // todo move to common module
#include "tlosertree.h"
#include "tsdb.h"
#include "tsdbMain.h"
#include "tsdbint.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
......@@ -54,7 +54,7 @@ typedef struct SQueryFilePos {
} SQueryFilePos;
typedef struct SDataBlockLoadInfo {
SFileGroup* fileGroup;
SDFileSet* fileGroup;
int32_t slot;
int32_t tid;
SArray* pLoadedCols;
......@@ -113,9 +113,9 @@ typedef struct STsdbQueryHandle {
bool cachelastrow; // check if last row cached
void* qinfo; // query info handle, for debug purpose
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SFileGroup* pFileGroup;
SFileGroupIter fileIter;
SRWHelper rhelper;
SDFileSet* pFileGroup;
SFSIter fileIter;
SReadH rhelper;
STableBlockInfo* pDataBlockInfo;
SDataCols *pDataCols; // in order to hold current file data block
......@@ -295,7 +295,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond*
pQueryHandle->locateStart = false;
pQueryHandle->pMemRef = pMemRef;
if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) {
if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) {
goto out_of_memory;
}
......@@ -716,12 +716,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
pCheckInfo->numOfBlocks = 0;
if (tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb) != TSDB_CODE_SUCCESS) {
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
code = terrno;
break;
}
SBlockIdx* compIndex = &pQueryHandle->rhelper.curCompIdx;
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx;
// no data block in this file, try next file
if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) {
......@@ -742,7 +742,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
pCheckInfo->compSize = compIndex->len;
}
tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
......@@ -792,14 +792,14 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBloc
goto _error;
}
code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema);
code = tdInitDataCols(pQueryHandle->rhelper.pDCols[0], pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _error;
}
code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
code = tdInitDataCols(pQueryHandle->rhelper.pDCols[1], pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......@@ -821,7 +821,7 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBloc
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
pBlock->numOfRows = pCols->numOfRows;
......@@ -942,7 +942,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
return code;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pTSCol = pQueryHandle->rhelper.pDCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
if (pCheckInfo->lastKey > pBlock->keyFirst) {
......@@ -965,7 +965,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
return code;
}
SDataCols* pTsCol = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pTsCol = pQueryHandle->rhelper.pDCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else {
......@@ -1050,7 +1050,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
char* pData = NULL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
int32_t num = end - start + 1;
......@@ -1274,7 +1274,7 @@ static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) {
static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
......@@ -1317,7 +1317,7 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey >= pBlockInfo->window.ekey) {
endPos = pBlockInfo->rows - 1;
......@@ -1343,7 +1343,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
......@@ -1751,19 +1751,19 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
STimeWindow win = TSWINDOW_INITIALIZER;
while (true) {
pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
tsdbRLockFS(REPO_FS(pQueryHandle->pTsdb));
if ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) == NULL) {
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
if ((pQueryHandle->pFileGroup = tsdbFSIterNext(&pQueryHandle->fileIter)) == NULL) {
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
break;
}
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey);
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle,
pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo);
pQueryHandle->pFileGroup = NULL;
......@@ -1771,15 +1771,15 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
break;
}
if (tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) {
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
if (tsdbSetAndOpenReadFSet(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) {
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
code = terrno;
break;
}
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
if (tsdbLoadCompIdx(&pQueryHandle->rhelper, NULL) < 0) {
if (tsdbLoadBlockIdx(&pQueryHandle->rhelper) < 0) {
code = terrno;
break;
}
......@@ -1789,7 +1789,7 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
}
tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %p", pQueryHandle, numOfBlocks, numOfTables,
pQueryHandle->pFileGroup->fileId, pQueryHandle->qinfo);
pQueryHandle->pFileGroup->fid, pQueryHandle->qinfo);
assert(numOfBlocks >= 0);
if (numOfBlocks == 0) {
......@@ -1820,7 +1820,7 @@ static int32_t getFirstFileDataBlock(STsdbQueryHandle* pQueryHandle, bool* exist
assert(pQueryHandle->pFileGroup != NULL && pQueryHandle->numOfBlocks > 0);
cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId;
cur->fid = pQueryHandle->pFileGroup->fid;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
return getDataBlockRv(pQueryHandle, pBlockInfo, exists);
......@@ -1843,7 +1843,7 @@ static void moveToNextDataBlockInCurrentFile(STsdbQueryHandle* pQueryHandle) {
}
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
STsdbFS* pFileHandle = REPO_FS(pQueryHandle->pTsdb);
SQueryFilePos* cur = &pQueryHandle->cur;
// find the start data block in file
......@@ -1852,10 +1852,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
tsdbRLockFS(pFileHandle);
tsdbFSIterInit(&pQueryHandle->fileIter, pFileHandle, pQueryHandle->order);
tsdbFSIterSeek(&pQueryHandle->fileIter, fid);
tsdbUnLockFS(pFileHandle);
return getFirstFileDataBlock(pQueryHandle, exists);
} else {
......@@ -2382,7 +2382,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
}
int64_t stime = taosGetTimestampUs();
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
tsdbLoadBlockStatis(&pHandle->rhelper, pBlockInfo->compBlock);
int16_t* colIds = pHandle->defaultLoadColumn->pData;
......@@ -2392,7 +2392,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
pHandle->statis[i].colId = colIds[i];
}
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols);
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols);
// always load the first primary timestamp column data
SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
......@@ -2444,7 +2444,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
// data block has been loaded, todo extract method
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid &&
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns;
} else { // only load the file block
......@@ -2923,7 +2923,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
// todo check error
tsdbMayUnTakeMemSnapshot(pQueryHandle);
tsdbDestroyHelper(&pQueryHandle->rhelper);
tsdbDestroyReadH(&pQueryHandle->rhelper);
tdFreeDataCols(pQueryHandle->pDataCols);
pQueryHandle->pDataCols = NULL;
......
......@@ -19,10 +19,10 @@
static void tsdbResetReadTable(SReadH *pReadh);
static void tsdbResetReadFile(SReadH *pReadh);
static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols);
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize);
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds);
static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
......@@ -240,10 +240,10 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
return 0;
}
int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlkInfo) {
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
ASSERT(pBlock->numOfSubBlocks > 0);
const SBlock *iBlock = pBlock;
SBlock *iBlock = pBlock;
if (pBlock->numOfSubBlocks > 1) {
if (pBlkInfo) {
iBlock = (SBlock *)POINTER_SHIFT(pBlkInfo, pBlock->offset);
......@@ -266,11 +266,10 @@ int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pB
return 0;
}
int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlkInfo, const int16_t *colIds,
int numOfColsIds) {
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds) {
ASSERT(pBlock->numOfSubBlocks > 0);
const SBlock *iBlock = pBlock;
SBlock *iBlock = pBlock;
if (pBlock->numOfSubBlocks > 1) {
if (pBlkInfo) {
iBlock = POINTER_SHIFT(pBlkInfo, pBlock->offset);
......@@ -300,7 +299,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tstrerror(terrno));
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1;
}
......@@ -310,7 +309,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
if (nread < 0) {
tsdbError("vgId:%d failed to load block statis part while read file %s sinces %s, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), pBlock->offset, size);
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size);
return -1;
}
......@@ -318,14 +317,14 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
" read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, size, nread);
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), size)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, size);
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
return -1;
}
......@@ -408,7 +407,7 @@ static void tsdbResetReadFile(SReadH *pReadh) {
tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
}
static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols) {
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols) {
ASSERT(pBlock->numOfSubBlocks >= 0 && pBlock->numOfSubBlocks <= 1);
SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_HEAD_FILE(pReadh);
......@@ -420,14 +419,14 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols
if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block data part while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tstrerror(terrno));
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
return -1;
}
int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlock->len);
if (nread < 0) {
tsdbError("vgId:%d failed to load block data part while read file %s sinces %s, offset:%" PRId64 " len :%d",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), pBlock->offset, pBlock->len);
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, pBlock->len);
return -1;
}
......@@ -435,7 +434,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block data part in file %s is corrupted, offset:%" PRId64
" expected bytes:%d read bytes: %" PRId64,
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, pBlock->len, nread);
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, pBlock->len, nread);
return -1;
}
......@@ -443,7 +442,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tsize);
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tsize);
return -1;
}
......@@ -542,7 +541,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
return 0;
}
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds) {
ASSERT(pBlock->numOfSubBlocks <= 1);
ASSERT(colIds[0] == 0);
......@@ -653,7 +652,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows,
pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_NAME(pDFile),
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
pBlockCol->colId, offset);
return -1;
}
......
......@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbMain.h"
#include "tsdbint.h"
#if 0
#ifndef _TSDB_PLUGINS
int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
......@@ -33,4 +34,5 @@ int tsdbCloseScanFile(STsdbScanHandle* pScanHandle) { return 0; }
void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle) {}
#endif
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#define TAOS_RANDOM_FILE_FAIL_TEST
#include "tsdbint.h"
static int tdInitKVStoreHeader(int fd, char *fname);
static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo);
static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
static void tdFreeKVStore(SKVStore *pStore);
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version);
static int tdEncodeKVRecord(void **buf, SKVRecord *pRecord);
static void * tdDecodeKVRecord(void *buf, SKVRecord *pRecord);
static int tdRestoreKVStore(SKVStore *pStore);
int tdCreateKVStore(char *fname) {
int fd = open(fname, O_RDWR | O_CREAT, 0755);
if (fd < 0) {
uError("failed to open file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tdInitKVStoreHeader(fd, fname) < 0) goto _err;
if (fsync(fd) < 0) {
uError("failed to fsync file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (close(fd) < 0) {
uError("failed to close file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
return 0;
_err:
if (fd >= 0) close(fd);
(void)remove(fname);
return -1;
}
int tdDestroyKVStore(char *fname) {
if (remove(fname) < 0) {
uError("failed to remove file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
SStoreInfo info = {0};
uint32_t version = 0;
SKVStore *pStore = tdNewKVStore(fname, iFunc, aFunc, appH);
if (pStore == NULL) return NULL;
pStore->fd = open(pStore->fname, O_RDWR);
if (pStore->fd < 0) {
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info, &version) < 0) goto _err;
if (version != KVSTORE_FILE_VERSION) {
uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fname, version,
KVSTORE_FILE_VERSION);
}
pStore->info.size = TD_KVSTORE_HEADER_SIZE;
pStore->info.magic = info.magic;
if (tdRestoreKVStore(pStore) < 0) goto _err;
close(pStore->fd);
pStore->fd = -1;
return pStore;
_err:
if (pStore->fd > 0) {
close(pStore->fd);
pStore->fd = -1;
}
tdFreeKVStore(pStore);
return NULL;
}
void tdCloseKVStore(SKVStore *pStore) { tdFreeKVStore(pStore); }
int tdKVStoreStartCommit(SKVStore *pStore) {
ASSERT(pStore->fd < 0);
pStore->fd = open(pStore->fname, O_RDWR);
if (pStore->fd < 0) {
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (lseek(pStore->fd, 0, SEEK_END) < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(pStore->info.size == lseek(pStore->fd, 0, SEEK_CUR));
return 0;
_err:
if (pStore->fd > 0) {
close(pStore->fd);
pStore->fd = -1;
}
return -1;
}
int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen) {
SKVRecord rInfo = {0};
char buf[64] = "\0";
char * pBuf = buf;
rInfo.offset = lseek(pStore->fd, 0, SEEK_CUR);
if (rInfo.offset < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
rInfo.uid = uid;
rInfo.size = contLen;
int tlen = tdEncodeKVRecord((void *)(&pBuf), &rInfo);
ASSERT(tlen == POINTER_DISTANCE(pBuf, buf));
ASSERT(tlen == sizeof(SKVRecord));
if (taosWrite(pStore->fd, buf, tlen) < tlen) {
uError("failed to write %d bytes to file %s since %s", tlen, pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosWrite(pStore->fd, cont, contLen) < contLen) {
uError("failed to write %d bytes to file %s since %s", contLen, pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pStore->info.magic =
taosCalcChecksum(pStore->info.magic, (uint8_t *)POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
pStore->info.size += (sizeof(SKVRecord) + contLen);
SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid));
if (pRecord != NULL) { // just to insert
pStore->info.tombSize += pRecord->size;
} else {
pStore->info.nRecords++;
}
taosHashPut(pStore->map, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
uTrace("put uid %" PRIu64 " into kvStore %s", uid, pStore->fname);
return 0;
}
int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
SKVRecord rInfo = {0};
char buf[128] = "\0";
SKVRecord *pRecord = taosHashGet(pStore->map, (void *)(&uid), sizeof(uid));
if (pRecord == NULL) {
uError("failed to drop KV store record with key %" PRIu64 " since not find", uid);
return -1;
}
rInfo.offset = -pRecord->offset;
rInfo.uid = pRecord->uid;
rInfo.size = pRecord->size;
void *pBuf = buf;
tdEncodeKVRecord(&pBuf, &rInfo);
if (taosWrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) {
uError("failed to write %" PRId64 " bytes to file %s since %s", (int64_t)(POINTER_DISTANCE(pBuf, buf)), pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pStore->info.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf));
pStore->info.size += POINTER_DISTANCE(pBuf, buf);
pStore->info.nDels++;
pStore->info.nRecords--;
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
taosHashRemove(pStore->map, (void *)(&uid), sizeof(uid));
uDebug("drop uid %" PRIu64 " from KV store %s", uid, pStore->fname);
return 0;
}
int tdKVStoreEndCommit(SKVStore *pStore) {
ASSERT(pStore->fd > 0);
if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) return -1;
if (fsync(pStore->fd) < 0) {
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (close(pStore->fd) < 0) {
uError("failed to close file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pStore->fd = -1;
return 0;
}
void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
SStoreInfo info = {0};
int fd = open(fname, O_RDONLY);
if (fd < 0) goto _err;
if (taosRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) goto _err;
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) goto _err;
void *pBuf = (void *)buf;
pBuf = tdDecodeStoreInfo(pBuf, &info);
off_t offset = lseek(fd, 0, SEEK_END);
if (offset < 0) goto _err;
close(fd);
*magic = info.magic;
*size = offset;
return;
_err:
if (fd >= 0) close(fd);
*magic = TD_KVSTORE_INIT_MAGIC;
*size = 0;
}
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
if (lseek(fd, 0, SEEK_SET) < 0) {
uError("failed to lseek file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) {
uError("file %s is broken", fname);
terrno = TSDB_CODE_COM_FILE_CORRUPTED;
return -1;
}
void *pBuf = (void *)buf;
pBuf = tdDecodeStoreInfo(pBuf, pInfo);
pBuf = taosDecodeFixedU32(pBuf, version);
return 0;
}
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
if (lseek(fd, 0, SEEK_SET) < 0) {
uError("failed to lseek file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
void *pBuf = buf;
tdEncodeStoreInfo(&pBuf, pInfo);
taosEncodeFixedU32(&pBuf, KVSTORE_FILE_VERSION);
ASSERT(POINTER_DISTANCE(pBuf, buf) + sizeof(TSCKSUM) <= TD_KVSTORE_HEADER_SIZE);
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
if (taosWrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static int tdInitKVStoreHeader(int fd, char *fname) {
SStoreInfo info = {TD_KVSTORE_HEADER_SIZE, 0, 0, 0, TD_KVSTORE_INIT_MAGIC};
return tdUpdateKVStoreHeader(fd, fname, &info);
}
static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeVariantI64(buf, pInfo->size);
tlen += taosEncodeVariantI64(buf, pInfo->tombSize);
tlen += taosEncodeVariantI64(buf, pInfo->nRecords);
tlen += taosEncodeVariantI64(buf, pInfo->nDels);
tlen += taosEncodeFixedU32(buf, pInfo->magic);
return tlen;
}
static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
buf = taosDecodeVariantI64(buf, &(pInfo->size));
buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
return buf;
}
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
SKVStore *pStore = (SKVStore *)calloc(1, sizeof(SKVStore));
if (pStore == NULL) goto _err;
pStore->fname = strdup(fname);
if (pStore->fname == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
goto _err;
}
pStore->fd = -1;
pStore->iFunc = iFunc;
pStore->aFunc = aFunc;
pStore->appH = appH;
pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pStore->map == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
goto _err;
}
return pStore;
_err:
tdFreeKVStore(pStore);
return NULL;
}
static void tdFreeKVStore(SKVStore *pStore) {
if (pStore) {
tfree(pStore->fname);
taosHashCleanup(pStore->map);
free(pStore);
}
}
static int tdEncodeKVRecord(void **buf, SKVRecord *pRecord) {
int tlen = 0;
tlen += taosEncodeFixedU64(buf, pRecord->uid);
tlen += taosEncodeFixedI64(buf, pRecord->offset);
tlen += taosEncodeFixedI64(buf, pRecord->size);
return tlen;
}
static void *tdDecodeKVRecord(void *buf, SKVRecord *pRecord) {
buf = taosDecodeFixedU64(buf, &(pRecord->uid));
buf = taosDecodeFixedI64(buf, &(pRecord->offset));
buf = taosDecodeFixedI64(buf, &(pRecord->size));
return buf;
}
static int tdRestoreKVStore(SKVStore *pStore) {
char tbuf[128] = "\0";
void * buf = NULL;
int64_t maxBufSize = 0;
SKVRecord rInfo = {0};
SKVRecord *pRecord = NULL;
ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR));
ASSERT(pStore->info.size == TD_KVSTORE_HEADER_SIZE);
while (true) {
int64_t tsize = taosRead(pStore->fd, tbuf, sizeof(SKVRecord));
if (tsize == 0) break;
if (tsize < sizeof(SKVRecord)) {
uError("failed to read %" PRIzu " bytes from file %s at offset %" PRId64 "since %s", sizeof(SKVRecord), pStore->fname,
pStore->info.size, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
char *pBuf = tdDecodeKVRecord(tbuf, &rInfo);
ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord));
ASSERT((rInfo.offset > 0) ? (pStore->info.size == rInfo.offset) : true);
if (rInfo.offset < 0) {
taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid));
pStore->info.size += sizeof(SKVRecord);
pStore->info.nRecords--;
pStore->info.nDels++;
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
} else {
ASSERT(rInfo.offset > 0 && rInfo.size > 0);
if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) {
uError("failed to put record in KV store %s", pStore->fname);
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
goto _err;
}
maxBufSize = MAX(maxBufSize, rInfo.size);
if (lseek(pStore->fd, (off_t)rInfo.size, SEEK_CUR) < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pStore->info.size += (sizeof(SKVRecord) + rInfo.size);
pStore->info.nRecords++;
}
}
buf = malloc((size_t)maxBufSize);
if (buf == NULL) {
uError("failed to allocate %" PRId64 " bytes in KV store %s", maxBufSize, pStore->fname);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pRecord = taosHashIterate(pStore->map, NULL);
while (pRecord) {
if (lseek(pStore->fd, (off_t)(pRecord->offset + sizeof(SKVRecord)), SEEK_SET) < 0) {
uError("failed to lseek file %s since %s, offset %" PRId64, pStore->fname, strerror(errno), pRecord->offset);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosRead(pStore->fd, buf, (size_t)pRecord->size) < pRecord->size) {
uError("failed to read %" PRId64 " bytes from file %s since %s, offset %" PRId64, pRecord->size, pStore->fname,
strerror(errno), pRecord->offset);
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (pStore->iFunc) {
if ((*pStore->iFunc)(pStore->appH, buf, (int)pRecord->size) < 0) {
uError("failed to restore record uid %" PRIu64 " in kv store %s at offset %" PRId64 " size %" PRId64
" since %s",
pRecord->uid, pStore->fname, pRecord->offset, pRecord->size, tstrerror(terrno));
goto _err;
}
}
pRecord = taosHashIterate(pStore->map, pRecord);
}
if (pStore->aFunc) (*pStore->aFunc)(pStore->appH);
tfree(buf);
return 0;
_err:
taosHashCancelIterate(pStore->map, pRecord);
tfree(buf);
return -1;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册