提交 d8e0d05a 编写于 作者: H Hongze Cheng

refactor done

上级 5f825cf8
......@@ -14,15 +14,18 @@
*/
#ifndef _TD_TSDB_MAIN_H_
#define _TD_TSDB_MAIN_H_
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "os.h"
#include "hash.h"
#include "os.h"
#include "tcoding.h"
#include "tglobal.h"
#include "tkvstore.h"
#include "tlist.h"
#include "tlog.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tsdb.h"
#include "tskiplist.h"
#include "tutil.h"
......@@ -33,7 +36,9 @@ extern "C" {
extern int tsdbDebugFlag;
#define tsdbFatal(...) { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }}
#define tsdbFatal(...) \
{ \
if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }}
#define tsdbError(...) { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }}
#define tsdbWarn(...) { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }}
#define tsdbInfo(...) { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }}
......@@ -467,7 +472,7 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
int tsdbGetFileName(char* rootDir, int type, int vid, int fid, int seq, char** fname);
int tsdbGetFileName(char* rootDir, int type, int vid, int fid, int seq, char* fname);
int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
char* tsdbGetDataDirName(char* rootDir);
......@@ -502,7 +507,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle* pReadH, SFileGroup* pFGroup);
void tsdbCloseAndUnsetReadFile(SReadHandle* pReadH);
int tsdbLoadBlockIdx(SReadHandle* pReadH);
int tsdbSetReadTable(SReadHandle* pReadH, STable* pTable);
int tsdbLoadBlockInfo(SReadHandle* pReadH);
int tsdbLoadBlockInfo(SReadHandle* pReadH, void* pMem);
int tsdbLoadBlockData(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBlockInfo);
int tsdbLoadBlockDataCols(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBlockInfo, int16_t* colIds, int numOfCols);
int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock);
......@@ -532,6 +537,19 @@ int tsdbEncodeBlockIdx(void** buf, SBlockIdx* pBlockIdx);
void* tsdbDecodeBlockIdx(void* buf, SBlockIdx* pBlockIdx);
int tsdbLoadKeyCol(SReadHandle* pReadH, SBlockInfo* pBlockInfo, SBlock* pBlock);
int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days);
#define TSDB_DATA_DIR_NAME "data"
void *tsdbCommitData(void *arg);
void tsdbGetDataStatis(SReadHandle *pReadH, SDataStatis *pStatis, int numOfCols);
static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
if (*(TSKEY*)key1 > *(TSKEY*)key2) {
return 1;
} else if (*(TSKEY*)key1 == *(TSKEY*)key2) {
return 0;
} else {
return -1;
}
}
#ifdef __cplusplus
}
......
......@@ -15,6 +15,7 @@
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <limits.h>
#include <sys/stat.h>
#include <sys/types.h>
......@@ -85,7 +86,7 @@ static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCom
static void tsdbSeekTSCommitHandle(STSCommitHandle *pTSCh, TSKEY key);
static int tsdbEncodeSFileGroup(void **buf, SFileGroup *pFGroup);
static void * tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup);
static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pOldGroup, SFileGroup *pNewGroup);
static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileGroup *pNewGroup);
static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid);
static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock);
static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup);
......@@ -111,7 +112,8 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock);
static void tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey);
static int tsdbInsertSubBlock(STSCommitHandle *pTSCh, SBlock *pBlock);
int tsdbCommitData(STsdbRepo *pRepo) {
void *tsdbCommitData(void *arg) {
STsdbRepo *pRepo = (STsdbRepo *)arg;
ASSERT(pRepo->commit == 1 && pRepo->imem != NULL);
SCommitHandle commitHandle = {0};
......@@ -119,20 +121,20 @@ int tsdbCommitData(STsdbRepo *pRepo) {
pCommitH->pRepo = pRepo;
if (tsdbStartCommit(pCommitH) < 0) return -1;
if (tsdbStartCommit(pCommitH) < 0) return NULL;
if (tsdbCommitTimeSeriesData(pCommitH) < 0) {
tsdbEndCommit(pCommitH, true);
return -1;
return NULL;
}
if (tsdbCommitMetaData(pCommitH) < 0) {
tsdbEndCommit(pCommitH, true);
return -1;
return NULL;
}
tsdbEndCommit(pCommitH, false);
return 0;
return NULL;
}
static int tsdbStartCommit(SCommitHandle *pCommitH) {
......@@ -151,7 +153,7 @@ static int tsdbStartCommit(SCommitHandle *pCommitH) {
pCommitH->fd = -1;
tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, &(pCommitH->fname));
tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, pCommitH->fname);
pCommitH->fd = open(pCommitH->fname, O_CREAT | O_WRONLY | O_APPEND, 0755);
if (pCommitH->fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pCommitH->fname, strerror(errno));
......@@ -538,7 +540,7 @@ static int tsdbLogTSFileChange(SCommitHandle *pCommitH, int fid) {
pDataFileChange->ofgroup = *pFGroup;
}
tsdbGetNextCommitFileGroup(&(pDataFileChange->ofgroup), &(pDataFileChange->nfgroup));
tsdbGetNextCommitFileGroup(pRepo, &(pDataFileChange->ofgroup), &(pDataFileChange->nfgroup));
if (tsdbLogFileChange(pCommitH, pChange) < 0) {
free(pNode);
......@@ -560,7 +562,7 @@ static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) {
return -1;
}
STsdbFileChange *pChange = pNode->data;
STsdbFileChange *pChange = (STsdbFileChange *)pNode->data;
pChange->type = TSDB_META_FILE_CHANGE;
SMetaFileChange *pMetaChange = (SMetaFileChange *)(pChange->change);
......@@ -600,7 +602,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) {
free(pNode);
return -1;
}
tdListAppendNode(pCommitH->pModLog, &pChange);
tdListAppendNode(pCommitH->pModLog, pNode);
} else {
break;
}
......@@ -653,20 +655,21 @@ static int tsdbEncodeSFileGroup(void **buf, SFileGroup *pFGroup) {
return tsize;
}
static void *tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup) {
static UNUSED_FUNC void *tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup) {
buf = taosDecodeVariantI32(buf, &(pFGroup->fileId));
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = &(pFGroup->files[type]);
char *fname = pFile->fname;
buf = taosDecodeString(buf, &(pFile->fname));
buf = taosDecodeString(buf, &fname);
buf = tsdbDecodeSFileInfo(buf, &(pFile->info));
}
return buf;
}
static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pOldGroup, SFileGroup *pNewGroup) {
static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileGroup *pNewGroup) {
pNewGroup->fileId = pOldGroup->fileId;
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
......@@ -675,9 +678,9 @@ static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pO
size_t len =strlen(pOldFile->fname);
if (len == 0 || pOldFile->fname[len - 1] == '1') {
tsdbGetFileName(pRepo->rootDir, type, vid, pOldGroup->fileId, 0, pNewFile->fname);
tsdbGetFileName(pRepo->rootDir, type, REPO_ID(pRepo), pOldGroup->fileId, 0, pNewFile->fname);
} else {
tsdbGetFileName(pRepo->rootDir, type, vid, pOldGroup->fileId, 1, pNewFile->fname);
tsdbGetFileName(pRepo->rootDir, type, REPO_ID(pRepo), pOldGroup->fileId, 1, pNewFile->fname);
}
}
}
......@@ -685,7 +688,6 @@ static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pO
static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
SCommitIter *pIter = pTSCh->pIters + tid;
SReadHandle *pReadH = pTSCh->pReadH;
SDataCols * pDataCols = pTSCh->pDataCols;
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
taosRLockLatch(&(pIter->pTable->latch));
......@@ -701,7 +703,7 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
return 0;
}
if (tsdbLoadBlockInfo(pReadH) < 0) {
if (tsdbLoadBlockInfo(pReadH, NULL) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch));
return -1;
}
......@@ -789,8 +791,8 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError)
tsdbCloseAndUnsetReadFile(pTSCh->pReadH);
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pOldFile = TSDB_FILE_IN_FGROUP(pOldGroup, type);
SFile *pNewFile = TSDB_FILE_IN_FGROUP(pNewGroup, type);
// SFile *pOldFile = TSDB_FILE_IN_FGROUP(pOldGroup, type);
SFile *pNewFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, type);
if (pNewFile->fd >= 0) {
if (!hasError) {
......@@ -805,16 +807,17 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError)
static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) {
ASSERT(pTSCh->nBlocks > 0);
SReadHandle *pReadH = pTSCh->pReadH;
STsdbRepo * pRepo = pReadH->pRepo;
SBlockInfo * pBlockInfo = pTSCh->pBlockInfo;
SFile * pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD);
int tlen = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks, pTSCh->nSubBlocks);
int tlen = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks+pTSCh->nSubBlocks);
pBlockInfo->delimiter = TSDB_FILE_DELIMITER;
pBlockInfo->uid = TABLE_UID(pReadH->pTable);
pBlockInfo->tid = TABLE_TID(pReadH->pTable);
if (pTSCh->nSubBlocks > 0) {
if (tsdbAllocBuf(&(pTSCh->pBlockInfo), tlen) < 0) {
if (tsdbAllocBuf((void **)(&(pTSCh->pBlockInfo)), tlen) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -866,7 +869,7 @@ static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) {
// label checksum
len += sizeof(TSCKSUM);
if (tsdbAllocBuf(&(pReadH->pBuf), len) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -891,8 +894,8 @@ static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) {
pFile->info.size += len;
pFile->info.offset = (uint32_t)offset;
pFile->info.len = len;
pFile->info.magic = taosCalcChecksum(pFile->info.magic,
(uint8_t *)POINTER_SHIFT(pReadH->pBuf, len - sizeof(TSCKSUM), sizeof(TSCKSUM)));
pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pReadH->pBuf, len - sizeof(TSCKSUM)),
sizeof(TSCKSUM));
ASSERT(pFile->info.size == pFile->info.offset + pFile->info.len);
......@@ -912,7 +915,6 @@ static int tsdbSetCommitTable(STSCommitHandle *pTSCh, STable *pTable) {
static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) {
SCommitIter *pIter = pTSCh->pIters + tid;
SReadHandle *pReadH = pTSCh->pReadH;
SDataCols * pDataCols = pTSCh->pDataCols;
SBlockIdx * pOldIdx = pReadH->pCurBlockIdx;
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
......@@ -1025,7 +1027,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD
ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true);
if (tsdbAllocBuf(&(pReadH->pBlockData), csize) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBlockData)), csize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -1042,7 +1044,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD
nColsNotAllNull++;
csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull);
if (tsdbAllocBuf(&(pReadH->pBlockData), csize) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBlockData)), csize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -1065,7 +1067,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD
int32_t blen = olen + COMP_OVERFLOW_BYTES; // allocated buffer length
int32_t clen = 0;
if (tsdbAllocBuf(&(pReadH->pBuf), coffset + blen + sizeof(TSCKSUM)) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), coffset + blen + sizeof(TSCKSUM)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -1074,7 +1076,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD
if (pCfg->compression) {
if (pCfg->compression == TWO_STAGE_COMP) {
if (tsdbAllocBuf(&(pReadH->pCBuf), blen) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pCBuf)), blen) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -1084,7 +1086,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pD
blen, pCfg->compression, pReadH->pCBuf, blen);
} else {
clen = olen;
memcpy(pData, olen);
memcpy(pData, pDataCol->pData, olen);
}
ASSERT(clen > 0 && clen <= blen);
......@@ -1155,7 +1157,7 @@ static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) {
for (int i = 0; i < pTSCh->nBlockIdx; i++) {
int tlen = tsdbEncodeBlockIdx(NULL, pTSCh->pBlockIdx + i);
if (tsdbAllocBuf(&(pReadH->pBuf), tlen + len) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), tlen + len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -1179,7 +1181,7 @@ static int tsdbUpdateFileGroupInfo(SFileGroup *pFileGroup) {
}
static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh) {
if (tsdbAllocBuf(&(pTSCh->pBlockIdx), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) {
if (tsdbAllocBuf((void **)(&(pTSCh->pBlockIdx)), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -1245,7 +1247,7 @@ static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks > 0);
int tsize = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks + 1);
if (tsdbAllocBuf(&(pTSCh->pBlockInfo), tsize) < 0) {
if (tsdbAllocBuf((void **)(&(pTSCh->pBlockInfo)), tsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -1260,7 +1262,7 @@ static int tsdbAddSubBlocks(STSCommitHandle *pTSCh, SBlock *pBlocks, int nBlocks
int tBlocks = pTSCh->nSubBlocks + nBlocks;
int tsize = sizeof(SBlock) * tBlocks;
if (tsdbAllocBuf(&(pTSCh->pSubBlock), tsize) < 0) {
if (tsdbAllocBuf((void **)(&(pTSCh->pSubBlock)), tsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -1280,7 +1282,6 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable);
int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock);
SBlock newBlock = {0};
SFile * pFile = NULL;
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
if (keyNext > pBlock->keyLast) { // append merge last block
......@@ -1373,7 +1374,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
}
// Commit data to keyLimit included
if (tsdbLoadKeyCol(pReadH, pBlock, NULL) < 0) return -1;
if (tsdbLoadKeyCol(pReadH, NULL, pBlock) < 0) return -1;
rows = tsdbLoadDataFromCache(pIter->pTable, &titer, pBlock->keyLast, INT32_MAX, NULL,
pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows);
......
......@@ -13,8 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include <dirent.h>
#include <errno.h>
#include <inttypes.h>
#include <regex.h>
#include <sys/types.h>
#define TAOS_RANDOM_FILE_FAIL_TEST
......@@ -69,6 +72,7 @@ void tsdbFreeFileH(STsdbFileH *pFileH) {
}
}
// TODO: refactor this function
int tsdbOpenFileH(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL);
......@@ -125,7 +129,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
if (fid < mfid) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbGetFileName(pRepo->rootDir, pCfg->tsdbId, fid, type, fname);
tsdbGetFileName(pRepo->rootDir, type, pCfg->tsdbId, fid, 0, fname);
(void)remove(fname);
}
continue;
......@@ -342,7 +346,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
memset((void *)pFile, 0, sizeof(SFile));
pFile->fd = -1;
tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
tsdbGetFileName(pRepo->rootDir, type, REPO_ID(pRepo), fid, type, pFile->fname);
if (access(pFile->fname, F_OK) == 0) {
tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), pFile->fname);
......@@ -525,7 +529,7 @@ int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) {
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
uint32_t version;
tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
tsdbGetFileName(pRepo->rootDir, type, REPO_ID(pRepo), fid, 0, pFile->fname);
pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
......
......@@ -12,6 +12,10 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <dirent.h>
#include <errno.h>
#include <inttypes.h>
#include <sys/types.h>
// no test file errors here
#include "tsdbMain.h"
......@@ -212,7 +216,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
// STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
uint32_t magic = 0;
char * fname = NULL;
char fname[TSDB_FILENAME_LEN] = "\0";
struct stat fState;
......@@ -229,7 +233,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) {
if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) {
tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, &fname);
tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, fname);
*index = TSDB_META_FILE_INDEX;
magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta);
} else {
......@@ -239,11 +243,11 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
SFileGroup *pFGroup =
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE);
if (pFGroup->fileId == fid) {
fname = strdup(pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname);
strncpy(fname, pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname, TSDB_FILENAME_LEN);
magic = pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].info.magic;
} else {
if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < (int)eindex) {
fname = strdup(pFGroup->files[0].fname);
strncpy(fname, pFGroup->files[0].fname, TSDB_FILENAME_LEN);
*index = pFGroup->fileId * TSDB_FILE_TYPE_MAX;
magic = pFGroup->files[0].info.magic;
} else {
......@@ -253,10 +257,8 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
}
strcpy(name, fname + prefixLen);
} else { // get the named file at the specified index. If not there, return 0
fname = malloc(prefixLen + strlen(name) + 2);
sprintf(fname, "%s/%s", prefix, name);
if (access(fname, F_OK) != 0) {
taosFree(fname);
taosFree(sdup);
return 0;
}
......@@ -265,20 +267,17 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
} else {
tsdbGetFileInfoImpl(fname, &magic, size);
}
taosFree(fname);
taosFree(sdup);
return magic;
}
if (stat(fname, &fState) < 0) {
taosTFree(fname);
return 0;
}
*size = fState.st_size;
// magic = *size;
taosTFree(fname);
return magic;
}
......@@ -517,15 +516,14 @@ static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg) {
free(dirName);
char *fname = tsdbGetMetaFileName(rootDir);
char fname[TSDB_FILENAME_LEN] = "\0";
tsdbGetFileName(rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, fname);
if (fname == NULL) return -1;
if (tdCreateKVStore(fname) < 0) {
tsdbError("vgId:%d failed to open KV store since %s", pCfg->tsdbId, tstrerror(terrno));
free(fname);
return -1;
}
free(fname);
return 0;
}
......@@ -541,7 +539,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
char *pBuf = buf;
tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, &fname);
tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, fname);
fd = open(fname, O_WRONLY | O_CREAT, 0755);
if (fd < 0) {
......@@ -581,7 +579,7 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
int fd = -1;
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, &fname);
tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, fname);
fd = open(fname, O_RDONLY);
if (fd < 0) {
......@@ -769,31 +767,43 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
SFileGroup *pFGroup = NULL;
SFileGroupIter iter;
SRWHelper rhelper = {0};
if (tsdbInitReadHelper(&rhelper, pRepo) < 0) goto _err;
SReadHandle * pReadH = tsdbNewReadHandle(pRepo);
if (pReadH == NULL) return -1;
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC);
while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) {
if (pFGroup->state) continue;
if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err;
if (tsdbLoadCompIdx(&rhelper, NULL) < 0) goto _err;
if (tsdbSetAndOpenReadFGroup(pReadH, pFGroup) < 0) {
tsdbFreeReadHandle(pReadH);
return -1;
}
if (tsdbLoadBlockIdx(pReadH) < 0) {
tsdbCloseAndUnsetReadFile(pReadH);
tsdbFreeReadHandle(pReadH);
return -1;
}
for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err;
SBlockIdx *pIdx = &(rhelper.curCompIdx);
if (tsdbSetReadTable(pReadH, pTable) < 0) {
tsdbCloseAndUnsetReadFile(pReadH);
tsdbFreeReadHandle(pReadH);
return -1;
}
if (pReadH->pCurBlockIdx != NULL && pTable->lastKey < pReadH->pCurBlockIdx->maxKey) {
pTable->lastKey = pReadH->pCurBlockIdx->maxKey;
if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey;
}
}
tsdbCloseAndUnsetReadFile(pReadH);
}
tsdbDestroyHelper(&rhelper);
tsdbFreeReadHandle(pReadH);
return 0;
_err:
tsdbDestroyHelper(&rhelper);
return -1;
}
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
......
......@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <errno.h>
#include <inttypes.h>
#include "tsdb.h"
#include "tsdbMain.h"
......@@ -23,6 +25,7 @@ static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static char * tsdbGetTsTupleKey(const void *data);
// ---------------- INTERNAL FUNCTIONS ----------------
......
......@@ -468,11 +468,11 @@ void tsdbFreeMeta(STsdbMeta *pMeta) {
}
int tsdbOpenMeta(STsdbRepo *pRepo) {
char * fname = NULL;
char fname[TSDB_FILENAME_LEN] = "\0";
STsdbMeta *pMeta = pRepo->tsdbMeta;
ASSERT(pMeta != NULL);
if (tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, &fname) < 0) goto _err;
if (tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, fname) < 0) goto _err;
pMeta->pStore = tdOpenKVStore(fname, tsdbRestoreTable, tsdbOrgMeta, (void *)pRepo);
if (pMeta->pStore == NULL) {
......@@ -481,11 +481,9 @@ int tsdbOpenMeta(STsdbRepo *pRepo) {
}
tsdbDebug("vgId:%d open TSDB meta succeed", REPO_ID(pRepo));
taosTFree(fname);
return 0;
_err:
taosTFree(fname);
return -1;
}
......
......@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <inttypes.h>
#include "os.h"
#include "tulog.h"
......@@ -114,7 +115,7 @@ typedef struct STsdbQueryHandle {
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SFileGroup* pFileGroup;
SFileGroupIter fileIter;
SRWHelper rhelper;
SReadHandle* rhelper;
STableBlockInfo* pDataBlockInfo;
SDataCols *pDataCols; // in order to hold current file data block
......@@ -266,7 +267,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond*
pQueryHandle->allocSize = 0;
pQueryHandle->locateStart = false;
if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) {
if ((pQueryHandle->rhelper = tsdbNewReadHandle((STsdbRepo *)tsdb)) == NULL) {
goto out_of_memory;
}
......@@ -671,15 +672,15 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
pCheckInfo->numOfBlocks = 0;
if (tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb) != TSDB_CODE_SUCCESS) {
if (tsdbSetReadTable(pQueryHandle->rhelper, pCheckInfo->pTableObj) < 0) {
code = terrno;
break;
}
SBlockIdx* compIndex = &pQueryHandle->rhelper.curCompIdx;
SBlockIdx* compIndex = pQueryHandle->rhelper->pCurBlockIdx;
// no data block in this file, try next file
if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) {
if (compIndex == NULL || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) {
continue; // no data blocks in the file belongs to pCheckInfo->pTable
}
......@@ -697,7 +698,9 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
pCheckInfo->compSize = compIndex->len;
}
tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
if (tsdbLoadBlockInfo(pQueryHandle->rhelper, (void *)(pCheckInfo->pCompInfo)) < 0) {
// TODO: deal with the error here
}
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
......@@ -736,7 +739,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
return code;
}
static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
int64_t st = taosGetTimestampUs();
STSchema *pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
......@@ -747,14 +750,14 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
goto _error;
}
code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema);
code = tdInitDataCols(pQueryHandle->rhelper->pDataCols[0], pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _error;
}
code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
code = tdInitDataCols(pQueryHandle->rhelper->pDataCols[1], pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......@@ -763,7 +766,7 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
int16_t* colIds = pQueryHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle)));
int32_t ret = tsdbLoadBlockDataCols(pQueryHandle->rhelper, pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle)));
if (ret != TSDB_CODE_SUCCESS) {
int32_t c = terrno;
assert(c != TSDB_CODE_SUCCESS);
......@@ -776,7 +779,7 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0];
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
pBlock->numOfRows = pCols->numOfRows;
......@@ -896,7 +899,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
return code;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pTSCol = pQueryHandle->rhelper->pDataCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
if (pCheckInfo->lastKey > pBlock->keyFirst) {
......@@ -919,7 +922,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
return code;
}
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pTSCol = pQueryHandle->rhelper->pDataCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else {
......@@ -1004,7 +1007,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
char* pData = NULL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
int32_t num = end - start + 1;
......@@ -1228,7 +1231,7 @@ static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) {
static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
......@@ -1272,7 +1275,7 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0];
if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey >= pBlockInfo->window.ekey) {
endPos = pBlockInfo->rows - 1;
......@@ -1297,7 +1300,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
SDataCols* pCols = pQueryHandle->rhelper->pDataCols[0];
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
......@@ -1681,7 +1684,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
break;
}
if (tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) {
if (tsdbSetAndOpenReadFGroup(pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) {
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
code = terrno;
break;
......@@ -1689,7 +1692,7 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
if (tsdbLoadCompIdx(&pQueryHandle->rhelper, NULL) < 0) {
if (tsdbLoadBlockIdx(pQueryHandle->rhelper) < 0) {
code = terrno;
break;
}
......@@ -2168,7 +2171,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
}
int64_t stime = taosGetTimestampUs();
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
if (tsdbLoadBlockDataInfo(pHandle->rhelper, pBlockInfo->compBlock) < 0) {
// TODO: deal with the error here
}
int16_t* colIds = pHandle->defaultLoadColumn->pData;
......@@ -2178,7 +2183,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
pHandle->statis[i].colId = colIds[i];
}
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols);
tsdbGetDataStatis(pHandle->rhelper, pHandle->statis, (int)numOfCols);
// always load the first primary timestamp column data
SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
......@@ -2703,7 +2708,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
// todo check error
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem);
tsdbDestroyHelper(&pQueryHandle->rhelper);
tsdbFreeReadHandle(pQueryHandle->rhelper);
tdFreeDataCols(pQueryHandle->pDataCols);
pQueryHandle->pDataCols = NULL;
......
......@@ -14,6 +14,7 @@
*/
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <sys/stat.h>
#include <sys/types.h>
......@@ -31,8 +32,7 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH);
static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bsize);
static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol,
SDataCol *pDataCol);
static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);
SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) {
SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH));
......@@ -61,6 +61,7 @@ SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) {
void tsdbFreeReadHandle(SReadHandle *pReadH) {
if (pReadH) {
tsdbCloseAndUnsetReadFile(pReadH);
taosTZfree(pReadH->pBlockIdx);
taosTZfree(pReadH->pBlockInfo);
taosTZfree(pReadH->pBlockData);
......@@ -79,7 +80,7 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
pReadH->fGroup = *pFGroup;
tsdbResetFGroupFd(&(pReadH->fGroup));
tsdbCloseAndUnsetReadFile(pReadH);
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), type);
......@@ -123,7 +124,7 @@ int tsdbLoadBlockIdx(SReadHandle *pReadH) {
ASSERT(pFile->info.size == pFile->info.offset + pFile->info.len);
if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), pFile->info.len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -207,7 +208,7 @@ int tsdbSetReadTable(SReadHandle *pReadH, STable *pTable) {
return 0;
}
int tsdbLoadBlockInfo(SReadHandle *pReadH) {
int tsdbLoadBlockInfo(SReadHandle *pReadH, void *pMem) {
ASSERT(pReadH != NULL);
if (pReadH->pCurBlockIdx == NULL) return 0;
......@@ -218,7 +219,7 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) {
ASSERT(pFile->fd > 0 && pBlockIdx->len > 0);
if (tsdbAllocBuf(&((void *)(pReadH->pBlockInfo)), pBlockIdx->len) < 0) {
if (pMem == NULL && tsdbAllocBuf((void **)(&(pReadH->pBlockInfo)), pBlockIdx->len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -230,7 +231,9 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) {
return -1;
}
ssize_t ret = taosTRead(pFile->fd, (void *)(pReadH->pBlockInfo), pBlockIdx->len);
if (pMem == NULL) pMem = (void *)(pReadH->pBlockInfo);
ssize_t ret = taosTRead(pFile->fd, pMem, pBlockIdx->len);
if (ret < 0) {
tsdbError("vgId:%d failed to read block info part of table %s from file %s since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pReadH->pTable), pFile->fname, strerror(errno));
......@@ -238,7 +241,7 @@ int tsdbLoadBlockInfo(SReadHandle *pReadH) {
return -1;
}
if (ret < pBlockIdx->len || tsdbVerifyBlockInfo(pReadH->pBlockInfo, pBlockIdx) < 0) {
if (ret < pBlockIdx->len || tsdbVerifyBlockInfo((SBlockInfo *)pMem, pBlockIdx) < 0) {
tsdbError("vgId:%d table %s block info part is corrupted in file %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pReadH->pTable), pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
......@@ -311,28 +314,28 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) {
if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname,
pBlock->offset, strerror(errno));
(int64_t)pBlock->offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols);
if (tsdbAllocBuf(&(pReadH->pBlockData), tsize) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBlockData)), tsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
int ret = taosTRead(pFile->fd, tsize);
int ret = taosTRead(pFile->fd, (void *)pReadH->pBlockData, tsize);
if (ret < 0) {
tsdbError("vgId:%d failed to read block data info part from file %s offset %" PRId64 " len %d since %s",
REPO_ID(pRepo), pFile->fname, pBlock->offset, tsize, strerror(errno));
REPO_ID(pRepo), pFile->fname, (int64_t)pBlock->offset, tsize, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret < tsize || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBlockData), tsize)) {
tsdbError("vgId:%d block data info part is corrupted in file %s offset %" PRId64 " len %d", REPO_ID(pRepo),
pFile->fname, pBlock->offset, tsize);
pFile->fname, (int64_t)pBlock->offset, tsize);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
......@@ -349,6 +352,34 @@ int tsdbLoadKeyCol(SReadHandle *pReadH, SBlockInfo *pBlockInfo, SBlock *pBlock)
return tsdbLoadBlockDataCols(pReadH, pBlock, pBlockInfo, &colId, 1);
}
void tsdbGetDataStatis(SReadHandle *pReadH, SDataStatis *pStatis, int numOfCols) {
SBlockData *pBlockData = pReadH->pBlockData;
for (int i = 0, j = 0; i < numOfCols;) {
if (j >= pBlockData->numOfCols) {
pStatis[i].numOfNull = -1;
i++;
continue;
}
if (pStatis[i].colId == pBlockData->cols[j].colId) {
pStatis[i].sum = pBlockData->cols[j].sum;
pStatis[i].max = pBlockData->cols[j].max;
pStatis[i].min = pBlockData->cols[j].min;
pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
pStatis[i].minIndex = pBlockData->cols[j].minIndex;
pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
i++;
j++;
} else if (pStatis[i].colId < pBlockData->cols[j].colId) {
pStatis[i].numOfNull = -1;
i++;
} else {
j++;
}
}
}
static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols) {
ASSERT(pBlock->numOfSubBlocks <= 1);
......@@ -360,14 +391,14 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA);
}
if (tsdbAllocBuf(&(pReadH->pBuf), pBlock->len) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), pBlock->len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s to offset %" PRId64 " since %s", REPO_ID(pRepo), pFile->fname,
pBlock->offset, strerror(errno));
(int64_t)pBlock->offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -375,7 +406,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
int ret = taosTRead(pFile->fd, (void *)(pReadH->pBuf), pBlock->len);
if (ret < 0) {
tsdbError("vgId:%d failed to read block data part from file %s at offset %" PRId64 " len %d since %s",
REPO_ID(pRepo), pFile->fname, pBlock->offset, pBlock->len, strerror(errno));
REPO_ID(pRepo), pFile->fname, (int64_t)pBlock->offset, pBlock->len, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -383,7 +414,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
int tsize = TSDB_BLOCK_DATA_LEN(pBlock->numOfCols);
if (ret < pBlock->len || !taosCheckChecksumWhole((uint8_t *)(pReadH->pBuf), tsize)) {
tsdbError("vgId:%d block data part from file %s at offset %" PRId64 " len %d is corrupted", REPO_ID(pRepo),
pFile->fname, pBlock->offset, pBlock->len);
pFile->fname, (int64_t)pBlock->offset, pBlock->len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
......@@ -391,7 +422,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
SBlockData *pBlockData = (SBlockData *)pReadH->pBuf;
ASSERT(pBlockData->delimiter == TSDB_FILE_DELIMITER);
ASSERT(pBlockData->numOfCols = pBlock->numOfCols);
ASSERT(pBlockData->numOfCols == pBlock->numOfCols);
tdResetDataCols(pDataCols);
ASSERT(pBlock->numOfRows <= pDataCols->maxPoints);
......@@ -427,7 +458,7 @@ static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols
zsize += (sizeof(VarDataLenT) * pBlock->numOfRows);
}
if (tsdbAllocBuf(&(pReadH->pCBuf), zsize) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pCBuf)), zsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -540,12 +571,13 @@ static int tsdbLoadBlockDataColsImpl(SReadHandle *pReadH, SBlock *pBlock, SDataC
}
static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
void * pBuf = pReadH->pBuf;
SFile *pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD);
void * pBuf = pReadH->pBuf;
STsdbRepo *pRepo = pReadH->pRepo;
SFile * pFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_HEAD);
pReadH->nBlockIdx = 0;
while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) {
if (tsdbAllocBuf(&((void *)(pReadH->pBlockIdx)), sizeof(SBlockIdx) * (pReadH->nBlockIdx + 1)) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBlockIdx)), sizeof(SBlockIdx) * (pReadH->nBlockIdx + 1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -559,7 +591,7 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) {
}
pReadH->nBlockIdx++;
ASSERT(pReadH->nBlockIdx == 1 || (pReadH->pBlockIdx[pReadH->nBlockIdx-1].tid < (pReadH->pBlockIdx[pReadH->nBlockIdx-2].tid));
ASSERT(pReadH->nBlockIdx == 1 || (pReadH->pBlockIdx[pReadH->nBlockIdx-1].tid < pReadH->pBlockIdx[pReadH->nBlockIdx-2].tid));
}
ASSERT(pReadH->nBlockIdx > 0);
......@@ -616,7 +648,7 @@ static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SB
STsdbRepo *pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config);
if (tsdbAllocBuf(&(pReadH->pBuf), pBlockCol->len) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pBuf)), pBlockCol->len) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -643,7 +675,7 @@ static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SB
zsize += (sizeof(VarDataLenT) * pBlock->numOfRows);
}
if (tsdbAllocBuf(&(pReadH->pCBuf), zsize) < 0) {
if (tsdbAllocBuf((void **)(&(pReadH->pCBuf)), zsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......
......@@ -17,37 +17,28 @@
#include "tsdbMain.h"
#define TSDB_DATA_DIR_NAME "data"
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".manifest", "meta", "config"};
int tsdbGetFileName(char *rootDir, int type, int vid, int fid, int seq, char **fname) {
if (*fname == NULL) {
*fname = (char *)malloc(TSDB_FILENAME_LEN);
if (*fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
int tsdbGetFileName(char *rootDir, int type, int vid, int fid, int seq, char *fname) {
switch (type) {
case TSDB_FILE_TYPE_HEAD:
case TSDB_FILE_TYPE_DATA:
case TSDB_FILE_TYPE_LAST:
if (seq == 0) { // For backward compatibility
snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid,
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid,
tsdbFileSuffix[type]);
} else {
snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s-%d", rootDir, TSDB_DATA_DIR_NAME, vid, fid,
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s-%d", rootDir, TSDB_DATA_DIR_NAME, vid, fid,
tsdbFileSuffix[type], seq);
}
break;
case TSDB_FILE_TYPE_MANIFEST:
snprintf(*fname, TSDB_FILENAME_LEN, "%s/v%d%s", rootDir, vid, tsdbFileSuffix[type]);
snprintf(fname, TSDB_FILENAME_LEN, "%s/v%d%s", rootDir, vid, tsdbFileSuffix[type]);
break;
case TSDB_FILE_TYPE_META:
case TSDB_FILE_TYPE_CFG:
snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s", rootDir, tsdbFileSuffix[type]);
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s", rootDir, tsdbFileSuffix[type]);
break;
default:
ASSERT(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册